diff --git a/go/vt/wrangler/fake_dbclient_test.go b/go/vt/wrangler/fake_dbclient_test.go index 5ed43622fb0..d716e3ad79b 100644 --- a/go/vt/wrangler/fake_dbclient_test.go +++ b/go/vt/wrangler/fake_dbclient_test.go @@ -152,7 +152,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re } } - log.Infof("Missing query: >%s<" + query) + log.Infof("Missing query: >>>>>>>>>>>>>>>>>>%s<<<<<<<<<<<<<<<", query) return nil, fmt.Errorf("unexpected query: %s", query) } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 07192b363a7..6d909f535f7 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -423,11 +423,9 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflow s return 0, nil, err } if ts == nil { - if ts == nil { - errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflow, targetKeyspace) - wr.Logger().Errorf(errorMsg) - return 0, nil, fmt.Errorf(errorMsg) - } + errorMsg := fmt.Sprintf("workflow %s not found in keyspace %s", workflow, targetKeyspace) + wr.Logger().Errorf(errorMsg) + return 0, nil, fmt.Errorf(errorMsg) } var sw iswitcher diff --git a/go/vt/wrangler/traffic_switcher_env_test.go b/go/vt/wrangler/traffic_switcher_env_test.go index 72fc035bd9d..686ff2049f4 100644 --- a/go/vt/wrangler/traffic_switcher_env_test.go +++ b/go/vt/wrangler/traffic_switcher_env_test.go @@ -44,6 +44,7 @@ import ( const vreplQueryks = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks'" const vreplQueryks2 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks2'" +const vreplQueryks1 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test_reverse' and db_name='vt_ks1'" type testMigraterEnv struct { ts *topo.Server @@ -193,6 +194,31 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards, ) } + for i, sourceShard := range sourceShards { + var rows []string + for j, targetShard := range targetShards { + bls := &binlogdatapb.BinlogSource{ + Keyspace: "ks2", + Shard: targetShard, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", sourceShard)), + }, { + Match: "t2", + Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", sourceShard)), + }}, + }, + } + rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls)) + } + tme.dbSourceClients[i].addInvariant(vreplQueryks1, sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + rows...), + ) + } + if err := tme.wr.saveRoutingRules(ctx, map[string][]string{ "t1": {"ks1.t1"}, "ks2.t1": {"ks1.t1"}, @@ -406,6 +432,13 @@ func (tme *testMigraterEnv) expectNoPreviousJournals() { } } +func (tme *testMigraterEnv) expectNoPreviousReverseJournals() { + // validate that no previous journals exist + for _, dbclient := range tme.dbTargetClients { + dbclient.addQueryRE(tsCheckJournals, &sqltypes.Result{}, nil) + } +} + func (tme *testShardMigraterEnv) forAllStreams(f func(i, j int)) { for i := range tme.targetShards { for j := range tme.sourceShards { diff --git a/go/vt/wrangler/workflow_test.go b/go/vt/wrangler/workflow_test.go index 8a92cbaf55e..b661ceff555 100644 --- a/go/vt/wrangler/workflow_test.go +++ b/go/vt/wrangler/workflow_test.go @@ -140,6 +140,7 @@ func expectCopyProgressQueries(t *testing.T, tme *testMigraterEnv) { db.AddQuery(query, result) } + func TestMoveTablesV2(t *testing.T) { ctx := context.Background() p := &VReplicationWorkflowParams{ @@ -162,6 +163,57 @@ func TestMoveTablesV2(t *testing.T) { tme.expectNoPreviousJournals() require.NoError(t, wf.SwitchTraffic(DirectionForward)) require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) + + tme.expectNoPreviousJournals() + tme.expectNoPreviousReverseJournals() + require.NoError(t, wf.ReverseTraffic()) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) +} + +func TestAbortMoveTablesV2(t *testing.T) { + ctx := context.Background() + p := &VReplicationWorkflowParams{ + Workflow: "test", + SourceKeyspace: "ks1", + TargetKeyspace: "ks2", + Tables: "t1,t2", + Cells: "cell1,cell2", + TabletTypes: "replica,rdonly,master", + Timeout: DefaultActionTimeout, + } + tme := newTestTableMigrater(ctx, t) + defer tme.stopTablets(t) + wf, err := tme.wr.NewVReplicationWorkflow(ctx, MoveTablesWorkflow, p) + require.NoError(t, err) + require.NotNil(t, wf) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + expectMoveTablesQueries(t, tme) + require.NoError(t, wf.Abort()) +} + +func TestReshardV2(t *testing.T) { + ctx := context.Background() + p := &VReplicationWorkflowParams{ + Workflow: "test", + SourceKeyspace: "ks1", + TargetKeyspace: "ks2", + SourceShards: []string{"-40", "40-"}, + TargetShards: []string{"-80", "80-"}, + Cells: "cell1,cell2", + TabletTypes: "replica,rdonly,master", + Timeout: DefaultActionTimeout, + } + tme := newTestTableMigrater(ctx, t) + defer tme.stopTablets(t) + wf, err := tme.wr.NewVReplicationWorkflow(ctx, ReshardWorkflow, p) + require.NoError(t, err) + require.NotNil(t, wf) + require.Equal(t, WorkflowStateNotSwitched, wf.CurrentState()) + tme.expectNoPreviousJournals() + expectMoveTablesQueries(t, tme) + tme.expectNoPreviousJournals() + require.NoError(t, wf.SwitchTraffic(DirectionForward)) + require.Equal(t, WorkflowStateAllSwitched, wf.CurrentState()) require.NoError(t, wf.Complete()) } @@ -186,11 +238,25 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) { dbclient.addInvariant("select 1 from _vt.vreplication where db_name='vt_ks2' and workflow='test' and message!='FROZEN'", noResult) dbclient.addInvariant("delete from _vt.vreplication where id in (1)", noResult) dbclient.addInvariant("delete from _vt.copy_state where vrepl_id in (1)", noResult) - - // + dbclient.addInvariant("insert into _vt.resharding_journal", noResult) + dbclient.addInvariant("select val from _vt.resharding_journal", noResult) + dbclient.addInvariant("select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test_reverse' and db_name='vt_ks1'", + sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "id|source|message|cell|tablet_types", + "int64|varchar|varchar|varchar|varchar"), + ""), + ) + //select pos, state, message from _vt.vreplication where id=1 } for _, dbclient := range tme.dbSourceClients { + dbclient.addInvariant("select val from _vt.resharding_journal", noResult) + dbclient.addInvariant("update _vt.vreplication set message = 'FROZEN'", noResult) + dbclient.addInvariant("insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name)", &sqltypes.Result{InsertID: uint64(1)}) + dbclient.addInvariant("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (1)", noResult) + dbclient.addInvariant("update _vt.vreplication set state = 'Stopped', message = 'stopped for cutover' where id in (2)", noResult) + dbclient.addInvariant("select id from _vt.vreplication where id = 1", resultid1) + dbclient.addInvariant("select id from _vt.vreplication where id = 2", resultid2) dbclient.addInvariant("select id from _vt.vreplication where db_name = 'vt_ks1' and workflow = 'test_reverse'", resultid1) dbclient.addInvariant("delete from _vt.vreplication where id in (1)", noResult) dbclient.addInvariant("delete from _vt.copy_state where vrepl_id in (1)", noResult) @@ -204,10 +270,23 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) { "varchar|varchar|varchar"), "MariaDB/5-456-892|Running", ) - tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) - tme.dbTargetClients[0].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) - tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=1", state, nil) - tme.dbTargetClients[1].addQuery("select pos, state, message from _vt.vreplication where id=2", state, nil) - tme.tmeDB.AddQueryPattern("drop table vt_ks1.t1", &sqltypes.Result{}) - tme.tmeDB.AddQueryPattern("drop table vt_ks1.t2", &sqltypes.Result{}) + tme.dbTargetClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=1", state) + tme.dbTargetClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state) + tme.dbTargetClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state) + tme.dbTargetClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state) + + state = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "pos|state|message", + "varchar|varchar|varchar"), + "MariaDB/5-456-893|Running", + ) + tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=1", state) + tme.dbSourceClients[0].addInvariant("select pos, state, message from _vt.vreplication where id=2", state) + tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=1", state) + tme.dbSourceClients[1].addInvariant("select pos, state, message from _vt.vreplication where id=2", state) + tme.tmeDB.AddQuery("drop table vt_ks1.t1", noResult) + tme.tmeDB.AddQuery("drop table vt_ks1.t2", noResult) + tme.tmeDB.AddQuery("drop table vt_ks2.t1", noResult) + tme.tmeDB.AddQuery("drop table vt_ks2.t2", noResult) + tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult) }