Skip to content

Commit

Permalink
[release-16.0] MoveTables: add flag to specify that routing rules sho…
Browse files Browse the repository at this point in the history
…uld not be created when a movetables workflow is created (#13858)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Aug 28, 2023
1 parent c934b2b commit d4e5cc7
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 36 deletions.
4 changes: 3 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ var commands = []commandGroup{
{
name: "MoveTables",
method: commandMoveTables,
params: "[--source=<sourceKs>] [--tables=<tableSpecs>] [--cells=<cells>] [--tablet_types=<source_tablet_types>] [--all] [--exclude=<tables>] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=<ddl-action>] [--source_shards=<source_shards>] <action> 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' <targetKs.workflow>",
params: "[--source=<sourceKs>] [--tables=<tableSpecs>] [--cells=<cells>] [--tablet_types=<source_tablet_types>] [--all] [--exclude=<tables>] [--auto_start] [--stop_after_copy] [--defer-secondary-keys] [--on-ddl=<ddl-action>] [--source_shards=<source_shards>] [--no-routing-rules] <action> 'action must be one of the following: Create, Complete, Cancel, SwitchTraffic, ReverseTrafffic, Show, or Progress' <targetKs.workflow>",
help: `Move table(s) to another keyspace, table_specs is a list of tables or the tables section of the vschema for the target keyspace. Example: '{"t1":{"column_vindexes": [{"column": "id1", "name": "hash"}]}, "t2":{"column_vindexes": [{"column": "id2", "name": "hash"}]}}'. In the case of an unsharded target keyspace the vschema for each table may be empty. Example: '{"t1":{}, "t2":{}}'.`,
},
{
Expand Down Expand Up @@ -2125,6 +2125,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl

// MoveTables-only params
renameTables := subFlags.Bool("rename_tables", false, "MoveTables only. Rename tables instead of dropping them. --rename_tables is only supported for Complete.")
noRoutingRules := subFlags.Bool("no-routing-rules", false, "(Advanced) MoveTables Create only. Do not create routing rules while creating the workflow. See the reference documentation for limitations if you use this flag.")

// MoveTables and Reshard params
sourceShards := subFlags.String("source_shards", "", "Source shards")
Expand Down Expand Up @@ -2262,6 +2263,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *pfl
vrwp.ExternalCluster = externalClusterName
vrwp.SourceTimeZone = *sourceTimeZone
vrwp.DropForeignKeys = *dropForeignKeys
vrwp.NoRoutingRules = *noRoutingRules
if *sourceShards != "" {
vrwp.SourceShards = strings.Split(*sourceShards, ",")
}
Expand Down
57 changes: 31 additions & 26 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func shouldInclude(table string, excludes []string) bool {
// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool,
externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string, sourceShards []string) error {
externalCluster string, dropForeignKeys, deferSecondaryKeys bool, sourceTimeZone, onDDL string,
sourceShards []string, noRoutingRules bool) error {
//FIXME validate tableSpecs, allTables, excludeTables
var tables []string
var externalTopo *topo.Server
Expand Down Expand Up @@ -206,33 +207,37 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
}
}
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
if noRoutingRules {
log.Warningf("Found --no-routing-rules flag, not creating routing rules for workflow %s.%s", targetKeyspace, workflow)
} else {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return err
}
}
}
}
if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down
44 changes: 36 additions & 8 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,34 @@ const mzCheckJournal = "/select val from _vt.resharding_journal where id="

var defaultOnDDL = binlogdatapb.OnDDLAction_name[int32(binlogdatapb.OnDDLAction_IGNORE)]

// TestMoveTablesNoRoutingRules confirms that MoveTables does not create routing rules if --no-routing-rules is specified.
func TestMoveTablesNoRoutingRules(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
SourceKeyspace: "sourceks",
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: "t1",
SourceExpression: "select * from t1",
}},
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()

env.tmc.expectVRQuery(100, mzCheckJournal, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectFrozenQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, true)
require.NoError(t, err)
rr, err := env.wr.ts.GetRoutingRules(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(rr.Rules))
}

func TestMigrateTables(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
Workflow: "workflow",
Expand All @@ -66,7 +94,7 @@ func TestMigrateTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -107,11 +135,11 @@ func TestMissingTables(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1,tyt,t2,txt", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.EqualError(t, err, "table(s) not found in source keyspace sourceks: tyt,txt")
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
}

Expand Down Expand Up @@ -167,7 +195,7 @@ func TestMoveTablesAllAndExclude(t *testing.T) {
env.tmc.expectVRQuery(200, insertPrefix, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil)
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "", "", "", tcase.allTables, tcase.excludeTables, true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
require.EqualValues(t, tcase.want, targetTables(env))
})
Expand Down Expand Up @@ -201,7 +229,7 @@ func TestMoveTablesStopFlags(t *testing.T) {
env.tmc.expectVRQuery(200, mzSelectIDQuery, &sqltypes.Result{})
// -auto_start=false is tested by NOT expecting the update query which sets state to RUNNING
err = env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true, "", false, false, "", defaultOnDDL, nil)
"", false, "", false, true, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
env.tmc.verifyQueries(t)
})
Expand All @@ -227,7 +255,7 @@ func TestMigrateVSchema(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

ctx := context.Background()
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil)
err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", `{"t1":{}}`, "", "", false, "", true, false, "", false, false, "", defaultOnDDL, nil, false)
require.NoError(t, err)
vschema, err := env.wr.ts.GetSrvVSchema(ctx, env.cell)
require.NoError(t, err)
Expand Down Expand Up @@ -2828,7 +2856,7 @@ func TestMoveTablesDDLFlag(t *testing.T) {
env.tmc.expectVRQuery(200, mzUpdateQuery, &sqltypes.Result{})

err := env.wr.MoveTables(ctx, "workflow", "sourceks", "targetks", "t1", "",
"", false, "", false, true, "", false, false, "", onDDLAction, nil)
"", false, "", false, true, "", false, false, "", onDDLAction, nil, false)
require.NoError(t, err)
})
}
Expand Down
5 changes: 4 additions & 1 deletion go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type VReplicationWorkflowParams struct {

// Migrate specific
ExternalCluster string

// MoveTables only
NoRoutingRules bool
}

// VReplicationWorkflow stores various internal objects for a workflow
Expand Down Expand Up @@ -433,7 +436,7 @@ func (vrw *VReplicationWorkflow) initMoveTables() error {
return vrw.wr.MoveTables(vrw.ctx, vrw.params.Workflow, vrw.params.SourceKeyspace, vrw.params.TargetKeyspace,
vrw.params.Tables, vrw.params.Cells, vrw.params.TabletTypes, vrw.params.AllTables, vrw.params.ExcludeTables,
vrw.params.AutoStart, vrw.params.StopAfterCopy, vrw.params.ExternalCluster, vrw.params.DropForeignKeys,
vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards)
vrw.params.DeferSecondaryKeys, vrw.params.SourceTimeZone, vrw.params.OnDDL, vrw.params.SourceShards, vrw.params.NoRoutingRules)
}

func (vrw *VReplicationWorkflow) initReshard() error {
Expand Down

0 comments on commit d4e5cc7

Please sign in to comment.