Skip to content

Commit

Permalink
Merge pull request #13515 from planetscale/partial-movetables-traffic…
Browse files Browse the repository at this point in the history
…-status

VReplication: Ignore unrelated shards in partial MoveTables traffic state
  • Loading branch information
mattlord authored Jul 17, 2023
2 parents d7704d4 + 902a91c commit 3af6955
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 25 deletions.
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC)], vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID),
},
{
name: "update cell,tablet_types,on_ddl",
Expand All @@ -161,7 +161,7 @@ func TestUpdateVRWorkflow(t *testing.T) {
OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE,
},
query: fmt.Sprintf(`update _vt.vreplication set source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`,
keyspace, shard, binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_EXEC_IGNORE)], "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID),
},
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const mzSelectIDQuery = "select id from _vt.vreplication where db_name='vt_targe
const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
const mzCheckJournal = "/select val from _vt.resharding_journal where id="

var defaultOnDDL = binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)]
var defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()

func TestMigrateTables(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Expand Down Expand Up @@ -2825,7 +2825,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
if onDDLAction == binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)] {
if onDDLAction == binlogdatapb.OnDDLAction_IGNORE.String() {
// This is the default and go does not marshal defaults
// for prototext fields so we use the default insert stmt.
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
Expand Down
41 changes: 24 additions & 17 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
}

var (
reverse bool
keyspace string
reverse bool
sourceKeyspace string
)

// We reverse writes by using the source_keyspace.workflowname_reverse workflow
Expand All @@ -229,17 +229,19 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
// source to check if writes have been switched.
if strings.HasSuffix(workflowName, "_reverse") {
reverse = true
keyspace = state.SourceKeyspace
// Flip the source and target keyspaces.
sourceKeyspace = state.TargetKeyspace
targetKeyspace = state.SourceKeyspace
workflowName = workflow.ReverseWorkflowName(workflowName)
} else {
keyspace = targetKeyspace
sourceKeyspace = state.SourceKeyspace
}
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
state.WorkflowType = workflow.TypeMoveTables

// We assume a consistent state, so only choose routing rule for one table.
if len(ts.Tables()) == 0 {
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", keyspace, workflowName)
return nil, nil, fmt.Errorf("no tables in workflow %s.%s", targetKeyspace, workflowName)

}
table := ts.Tables()[0]
Expand All @@ -252,19 +254,22 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl

rules := shardRoutingRules.Rules
for _, rule := range rules {
if rule.ToKeyspace == ts.SourceKeyspaceName() {
switch rule.ToKeyspace {
case sourceKeyspace:
state.ShardsNotYetSwitched = append(state.ShardsNotYetSwitched, rule.Shard)
} else {
case targetKeyspace:
state.ShardsAlreadySwitched = append(state.ShardsAlreadySwitched, rule.Shard)
default:
// Not a relevant rule.
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, keyspace, table, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithTableReadsSwitched(ctx, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand All @@ -275,7 +280,7 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
for _, table := range ts.Tables() {
rr := globalRules[table]
// if a rule exists for the table and points to the target keyspace, writes have been switched
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", keyspace, table) {
if len(rr) > 0 && rr[0] == fmt.Sprintf("%s.%s", targetKeyspace, table) {
state.WritesSwitched = true
break
}
Expand All @@ -292,12 +297,12 @@ func (wr *Wrangler) getWorkflowState(ctx context.Context, targetKeyspace, workfl
shard = ts.SourceShards()[0]
}

state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_RDONLY)
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, keyspace, shard, topodatapb.TabletType_REPLICA)
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = ws.GetCellsWithShardReadsSwitched(ctx, targetKeyspace, shard, topodatapb.TabletType_REPLICA)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -330,11 +335,13 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflowNam
if servedType != topodatapb.TabletType_REPLICA && servedType != topodatapb.TabletType_RDONLY {
return nil, fmt.Errorf("tablet type must be REPLICA or RDONLY: %v", servedType)
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
if !ts.isPartialMigration { // shard level traffic switching is all or nothing
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_REPLICA && len(ws.ReplicaCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched")
}
if direction == workflow.DirectionBackward && servedType == topodatapb.TabletType_RDONLY && len(ws.RdonlyCellsSwitched) == 0 {
return nil, fmt.Errorf("requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")
}
}
switch servedType {
case topodatapb.TabletType_REPLICA:
Expand Down
151 changes: 151 additions & 0 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -263,6 +264,156 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
return tme
}

// newTestTablePartialMigrater creates a test tablet migrater
// specifially for partial or shard by shard migrations.
// The shards must be the same on the source and target, and we
// must be moving a subset of them.
// fmtQuery should be of the form: 'select a, b %s group by a'.
// The test will Sprintf a from clause and where clause as needed.
func newTestTablePartialMigrater(ctx context.Context, t *testing.T, shards, shardsToMove []string, fmtQuery string) *testMigraterEnv {
require.Greater(t, len(shards), 1, "shard by shard migrations can only be done on sharded keyspaces")
tme := &testMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
tme.wr = New(logutil.NewConsoleLogger(), tme.ts, tmclient.NewTabletManagerClient())
tme.wr.sem = semaphore.NewWeighted(1)
tme.sourceShards = shards
tme.targetShards = shards
tme.tmeDB = fakesqldb.New(t)
expectVDiffQueries(tme.tmeDB)
tabletID := 10
for _, shard := range tme.sourceShards {
tme.sourcePrimaries = append(tme.sourcePrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks1", shard)))
tabletID += 10

_, sourceKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.sourceKeyRanges = append(tme.sourceKeyRanges, sourceKeyRange)
}
tpChoiceTablet := tme.sourcePrimaries[0].Tablet
tpChoice = &testTabletPickerChoice{
keyspace: tpChoiceTablet.Keyspace,
shard: tpChoiceTablet.Shard,
}
for _, shard := range tme.targetShards {
tme.targetPrimaries = append(tme.targetPrimaries, newFakeTablet(t, tme.wr, "cell1", uint32(tabletID), topodatapb.TabletType_PRIMARY, tme.tmeDB, TabletKeyspaceShard(t, "ks2", shard)))
tabletID += 10

_, targetKeyRange, err := topo.ValidateShardName(shard)
if err != nil {
t.Fatal(err)
}
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
}

vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "c1",
Name: "hash",
}},
},
},
}
err := tme.ts.SaveVSchema(ctx, "ks1", vs)
require.NoError(t, err)
err = tme.ts.SaveVSchema(ctx, "ks2", vs)
require.NoError(t, err)
err = tme.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false)
require.NoError(t, err)
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks2", []string{"cell1"}, false)
require.NoError(t, err)

tme.startTablets(t)
tme.createDBClients(ctx, t)
tme.setPrimaryPositions()
now := time.Now().Unix()

for i, shard := range shards {
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
var streamExtInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks1",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
streamExtInfoRows = append(streamExtInfoRows, fmt.Sprintf("%d|||||Running|vt_ks1|%d|%d|0|0||||0", i+1, now, now))
}
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
tme.dbTargetClients[i].addInvariant(streamInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...))
tme.dbTargetClients[i].addInvariant(streamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
tme.dbTargetClients[i].addInvariant(reverseStreamExtInfoKs2, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|pos|stop_pos|max_replication_lag|state|db_name|time_updated|transaction_timestamp|time_heartbeat|time_throttled|component_throttled|message|tags|workflow_type|workflow_sub_type|defer_secondary_keys",
"int64|varchar|int64|int64|int64|varchar|varchar|int64|int64|int64|int64|int64|varchar|varchar|int64|int64|int64"),
streamExtInfoRows...))
}
}

for i, shard := range shards {
for _, shardToMove := range shardsToMove {
var streamInfoRows []string
if shardToMove == shard {
bls := &binlogdatapb.BinlogSource{
Keyspace: "ks2",
Shard: shard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t1 where in_keyrange('%s')", shard)),
}, {
Match: "t2",
Filter: fmt.Sprintf(fmtQuery, fmt.Sprintf("from t2 where in_keyrange('%s')", shard)),
}},
},
}
streamInfoRows = append(streamInfoRows, fmt.Sprintf("%d|%v|||", i+1, bls))
tme.dbTargetClients[i].addInvariant(fmt.Sprintf(copyStateQuery, i+1, i+1), noResult)
}
tme.dbSourceClients[i].addInvariant(reverseStreamInfoKs1, sqltypes.MakeTestResult(sqltypes.MakeTestFields(
"id|source|message|cell|tablet_types",
"int64|varchar|varchar|varchar|varchar"),
streamInfoRows...),
)
}
}

tme.targetKeyspace = "ks2"
return tme
}

func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targetShards []string) *testShardMigraterEnv {
tme := &testShardMigraterEnv{}
tme.ts = memorytopo.NewServer("cell1", "cell2")
Expand Down
Loading

0 comments on commit 3af6955

Please sign in to comment.