Skip to content

Commit

Permalink
Merge pull request #6556 from planetscale/rn-update-reverse-workflow
Browse files Browse the repository at this point in the history
Reverse workflow: update cells/tablet_types
  • Loading branch information
deepthi authored Aug 17, 2020
2 parents b3294e8 + 0a63834 commit 02fa884
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 52 deletions.
59 changes: 46 additions & 13 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ type trafficSwitcher struct {
targetKeyspace string
tables []string
sourceKSSchema *vindexes.KeyspaceSchema
optCells string //cells option passed to MoveTables/Reshard
optTabletTypes string //tabletTypes option passed to MoveTables/Reshard

}

// tsTarget contains the metadata for each migration target.
Expand Down Expand Up @@ -371,7 +374,7 @@ func (wr *Wrangler) DropSources(ctx context.Context, targetKeyspace, workflow st
}

func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflow string) (*trafficSwitcher, error) {
targets, frozen, err := wr.buildTargets(ctx, targetKeyspace, workflow)
targets, frozen, optCells, optTabletTypes, err := wr.buildTargets(ctx, targetKeyspace, workflow)
if err != nil {
return nil, err
}
Expand All @@ -385,6 +388,8 @@ func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, wo
sources: make(map[string]*tsSource),
targetKeyspace: targetKeyspace,
frozen: frozen,
optCells: optCells,
optTabletTypes: optTabletTypes,
}
ts.wr.Logger().Infof("Migration ID for workflow %s: %d", workflow, ts.id)

Expand Down Expand Up @@ -455,31 +460,31 @@ func (wr *Wrangler) buildTrafficSwitcher(ctx context.Context, targetKeyspace, wo
return ts, nil
}

func (wr *Wrangler) buildTargets(ctx context.Context, targetKeyspace, workflow string) (targets map[string]*tsTarget, frozen bool, err error) {
func (wr *Wrangler) buildTargets(ctx context.Context, targetKeyspace, workflow string) (targets map[string]*tsTarget, frozen bool, optCells string, optTabletTypes string, err error) {
targets = make(map[string]*tsTarget)
targetShards, err := wr.ts.GetShardNames(ctx, targetKeyspace)
if err != nil {
return nil, false, err
return nil, false, "", "", err
}
// We check all target shards. All of them may not have a stream.
// For example, if we're splitting -80 to -40,40-80, only those
// two target shards will have vreplication streams.
for _, targetShard := range targetShards {
targetsi, err := wr.ts.GetShard(ctx, targetKeyspace, targetShard)
if err != nil {
return nil, false, err
return nil, false, "", "", err
}
if targetsi.MasterAlias == nil {
// This can happen if bad inputs are given.
return nil, false, fmt.Errorf("shard %v:%v doesn't have a master set", targetKeyspace, targetShard)
return nil, false, "", "", fmt.Errorf("shard %v:%v doesn't have a master set", targetKeyspace, targetShard)
}
targetMaster, err := wr.ts.GetTablet(ctx, targetsi.MasterAlias)
if err != nil {
return nil, false, err
return nil, false, "", "", err
}
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, source, message from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
p3qr, err := wr.tmc.VReplicationExec(ctx, targetMaster.Tablet, fmt.Sprintf("select id, source, message, cell, tablet_types from _vt.vreplication where workflow=%s and db_name=%s", encodeString(workflow), encodeString(targetMaster.DbName())))
if err != nil {
return nil, false, err
return nil, false, "", "", err
}
// If there's no vreplication stream, check the next target.
if len(p3qr.Rows) < 1 {
Expand All @@ -495,24 +500,26 @@ func (wr *Wrangler) buildTargets(ctx context.Context, targetKeyspace, workflow s
for _, row := range qr.Rows {
id, err := evalengine.ToInt64(row[0])
if err != nil {
return nil, false, err
return nil, false, "", "", err
}

var bls binlogdatapb.BinlogSource
if err := proto.UnmarshalText(row[1].ToString(), &bls); err != nil {
return nil, false, err
return nil, false, "", "", err
}
targets[targetShard].sources[uint32(id)] = &bls

if row[2].ToString() == frozenStr {
frozen = true
}
optCells = row[3].ToString()
optTabletTypes = row[4].ToString()
}
}
if len(targets) == 0 {
return nil, false, fmt.Errorf("no streams found in keyspace %s for: %s", targetKeyspace, workflow)
return nil, false, "", "", fmt.Errorf("no streams found in keyspace %s for: %s", targetKeyspace, workflow)
}
return targets, frozen, nil
return targets, frozen, optCells, optTabletTypes, nil
}

// hashStreams produces a reproducible hash based on the input parameters.
Expand Down Expand Up @@ -848,11 +855,37 @@ func (ts *trafficSwitcher) createReverseVReplication(ctx context.Context) error
}

_, err := ts.wr.VReplicationExec(ctx, source.master.Alias, binlogplayer.CreateVReplicationState(ts.reverseWorkflow, reverseBls, target.position, binlogplayer.BlpStopped, source.master.DbName()))
return err
if err != nil {
return err
}

// if user has defined the cell/tablet_types parameters in the forward workflow, update the reverse workflow as well
updateQuery := ts.getReverseVReplicationUpdateQuery(target.master.Alias.Cell, source.master.Alias.Cell, source.master.DbName())
if updateQuery != "" {
_, err = ts.wr.VReplicationExec(ctx, source.master.Alias, updateQuery)
return err
}
return nil
})
return err
}

func (ts *trafficSwitcher) getReverseVReplicationUpdateQuery(targetCell string, sourceCell string, dbname string) string {
// we try to be clever to understand what user intends:
// if target's cell is present in cells but not source's cell we replace it with the source's cell
if ts.optCells != "" && targetCell != sourceCell && strings.Contains(ts.optCells+",", targetCell+",") &&
!strings.Contains(ts.optCells+",", sourceCell+",") {
ts.optCells = strings.Replace(ts.optCells, targetCell, sourceCell, 1)
}

if ts.optCells != "" || ts.optTabletTypes != "" {
query := fmt.Sprintf("update _vt.vreplication set cell = '%s', tablet_types = '%s' where workflow = '%s' and db_name = '%s'",
ts.optCells, ts.optTabletTypes, ts.reverseWorkflow, dbname)
return query
}
return ""
}

func (ts *trafficSwitcher) deleteReverseVReplication(ctx context.Context) error {
return ts.forAllSources(func(source *tsSource) error {
query := fmt.Sprintf("delete from _vt.vreplication where db_name=%s and workflow=%s", encodeString(source.master.DbName()), encodeString(ts.reverseWorkflow))
Expand Down
16 changes: 8 additions & 8 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

const vreplQueryks = "select id, source, message from _vt.vreplication where workflow='test' and db_name='vt_ks'"
const vreplQueryks2 = "select id, source, message from _vt.vreplication where workflow='test' and db_name='vt_ks2'"
const vreplQueryks = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks'"
const vreplQueryks2 = "select id, source, message, cell, tablet_types from _vt.vreplication where workflow='test' and db_name='vt_ks2'"

type testMigraterEnv struct {
ts *topo.Server
Expand Down Expand Up @@ -182,11 +182,11 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
}},
},
}
rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls))
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
}
tme.dbTargetClients[i].addInvariant(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
)
}
Expand Down Expand Up @@ -301,11 +301,11 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
}},
},
}
rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls))
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
}
tme.dbTargetClients[i].addInvariant(vreplQueryks, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...),
)
}
Expand Down
83 changes: 64 additions & 19 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,9 +1455,9 @@ func TestMigrateFrozen(t *testing.T) {
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|FROZEN", bls1),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|FROZEN||", bls1),
), nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)

Expand All @@ -1468,9 +1468,9 @@ func TestMigrateFrozen(t *testing.T) {
}

tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|FROZEN", bls1),
"id|source|message|cell|tablet_type",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|FROZEN||", bls1),
), nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)

Expand Down Expand Up @@ -1515,9 +1515,9 @@ func TestMigrateDistinctSources(t *testing.T) {
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|", bls),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls),
), nil)

_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward, false)
Expand All @@ -1543,9 +1543,9 @@ func TestMigrateMismatchedTables(t *testing.T) {
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|", bls)),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls)),
nil,
)

Expand Down Expand Up @@ -1596,10 +1596,10 @@ func TestMigrateNoTableWildcards(t *testing.T) {
},
}
tme.dbTargetClients[0].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|", bls1),
fmt.Sprintf("2|%v|", bls2),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls1),
fmt.Sprintf("2|%v|||", bls2),
), nil)
bls3 := &binlogdatapb.BinlogSource{
Keyspace: "ks1",
Expand All @@ -1612,9 +1612,9 @@ func TestMigrateNoTableWildcards(t *testing.T) {
},
}
tme.dbTargetClients[1].addQuery(vreplQueryks2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
fmt.Sprintf("1|%v|", bls3),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
fmt.Sprintf("1|%v|||", bls3),
), nil)

_, err := tme.wr.SwitchReads(ctx, tme.targetKeyspace, "test", topodatapb.TabletType_RDONLY, nil, DirectionForward, false)
Expand All @@ -1641,6 +1641,51 @@ func TestReverseName(t *testing.T) {
}
}

func TestReverseVReplicationUpdateQuery(t *testing.T) {
ts := &trafficSwitcher{
reverseWorkflow: "wf",
}
dbname := "db"
type tCase struct {
optCells string
optTabletTypes string
targetCell string
sourceCell string
want string
}
updateQuery := "update _vt.vreplication set cell = '%s', tablet_types = '%s' where workflow = 'wf' and db_name = 'db'"
tCases := []tCase{
{
targetCell: "cell1", sourceCell: "cell1", optCells: "cell1", optTabletTypes: "",
want: fmt.Sprintf(updateQuery, "cell1", ""),
},
{
targetCell: "cell1", sourceCell: "cell2", optCells: "cell1", optTabletTypes: "",
want: fmt.Sprintf(updateQuery, "cell2", ""),
},
{
targetCell: "cell1", sourceCell: "cell2", optCells: "cell2", optTabletTypes: "",
want: fmt.Sprintf(updateQuery, "cell2", ""),
},
{
targetCell: "cell1", sourceCell: "cell1", optCells: "cell1,cell2", optTabletTypes: "replica,master",
want: fmt.Sprintf(updateQuery, "cell1,cell2", "replica,master"),
},
{
targetCell: "cell1", sourceCell: "cell1", optCells: "", optTabletTypes: "replica,master",
want: fmt.Sprintf(updateQuery, "", "replica,master"),
},
}
for _, tc := range tCases {
t.Run("", func(t *testing.T) {
ts.optCells = tc.optCells
ts.optTabletTypes = tc.optTabletTypes
got := ts.getReverseVReplicationUpdateQuery(tc.targetCell, tc.sourceCell, dbname)
require.Equal(t, tc.want, got)
})
}
}

func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) {
t.Helper()
ctx := context.Background()
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/vdiff_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position
}},
},
}
rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls))
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
position := vdiffStopPosition
if pos := positions[sourceShard+shard]; pos != "" {
position = pos
Expand All @@ -134,10 +134,10 @@ func newTestVDiffEnv(sourceShards, targetShards []string, query string, position
// migrater buildMigrationTargets
env.tmc.setVRResults(
master.tablet,
"select id, source, message from _vt.vreplication where workflow='vdiffTest' and db_name='vt_target'",
"select id, source, message, cell, tablet_types from _vt.vreplication where workflow='vdiffTest' and db_name='vt_target'",
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...,
),
)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/vexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func TestVExec(t *testing.T) {
var result *sqltypes.Result
var testCases []*TestCase
result = sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
"1|keyspace:\"source\" shard:\"0\" filter:<rules:<match:\"t1\" > >|",
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
"1|keyspace:\"source\" shard:\"0\" filter:<rules:<match:\"t1\" > >|||",
)
testCases = append(testCases, &TestCase{
name: "select",
query: "select id, source, message from _vt.vreplication",
query: "select id, source, message, cell, tablet_types from _vt.vreplication",
result: result,
})
result = &sqltypes.Result{
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/wrangler_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
}},
},
}
rows = append(rows, fmt.Sprintf("%d|%v|", j+1, bls))
rows = append(rows, fmt.Sprintf("%d|%v|||", j+1, bls))
position := testStopPosition
if pos := positions[sourceShard+shard]; pos != "" {
position = pos
Expand All @@ -130,10 +130,10 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit
// migrater buildMigrationTargets
env.tmc.setVRResults(
master.tablet,
"select id, source, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'",
"select id, source, message, cell, tablet_types from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'",
sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message",
"int64|varchar|varchar"),
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
rows...,
),
)
Expand Down

0 comments on commit 02fa884

Please sign in to comment.