Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrater: handle tables based on the vschema info #5315

Merged
merged 3 commits into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions go/vt/wrangler/migrater.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type migrater struct {
sourceKeyspace string
targetKeyspace string
tables []string
sourceKSSchema *vindexes.KeyspaceSchema
sourceWorkflows []string
}

Expand Down Expand Up @@ -117,6 +118,7 @@ func (wr *Wrangler) MigrateReads(ctx context.Context, targetKeyspace, workflow s
mi.wr.Logger().Errorf("migrateTableReads failed: %v", err)
return err
}
return nil
}
if err := mi.migrateShardReads(ctx, cells, servedType, direction); err != nil {
mi.wr.Logger().Errorf("migrateShardReads failed: %v", err)
Expand Down Expand Up @@ -292,6 +294,14 @@ func (wr *Wrangler) buildMigrater(ctx context.Context, targetKeyspace, workflow
}
}
}
vs, err := mi.wr.ts.GetVSchema(ctx, mi.sourceKeyspace)
if err != nil {
return nil, err
}
mi.sourceKSSchema, err = vindexes.BuildKeyspaceSchema(vs, mi.sourceKeyspace)
if err != nil {
return nil, err
}
return mi, nil
}

Expand Down Expand Up @@ -678,14 +688,6 @@ func (mi *migrater) createJournals(ctx context.Context) error {
}

func (mi *migrater) createReverseReplication(ctx context.Context) error {
vs, err := mi.wr.ts.GetVSchema(ctx, mi.sourceKeyspace)
if err != nil {
return err
}
ksschema, err := vindexes.BuildKeyspaceSchema(vs, mi.sourceKeyspace)
if err != nil {
return err
}
return mi.forAllUids(func(target *miTarget, uid uint32) error {
bls := target.sources[uid]
source := mi.sources[bls.Shard]
Expand All @@ -698,19 +700,19 @@ func (mi *migrater) createReverseReplication(ctx context.Context) error {
for _, rule := range bls.Filter.Rules {
var filter string
if strings.HasPrefix(rule.Match, "/") {
if ksschema.Keyspace.Sharded {
if mi.sourceKSSchema.Keyspace.Sharded {
filter = bls.Shard
}
} else {
var inKeyrange string
if ksschema.Keyspace.Sharded {
vtable, ok := ksschema.Tables[rule.Match]
if mi.sourceKSSchema.Keyspace.Sharded {
vtable, ok := mi.sourceKSSchema.Tables[rule.Match]
if !ok {
return fmt.Errorf("table %s not found in vschema", rule.Match)
}
// TODO(sougou): handle degenerate cases like sequence, etc.
// We currently assume the primary vindex is the best way to filter, which may not be true.
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vs.Vindexes[vtable.ColumnVindexes[0].Name].Type, bls.Shard)
inKeyrange = fmt.Sprintf(" where in_keyrange(%s, '%s', '%s')", sqlparser.String(vtable.ColumnVindexes[0].Columns[0]), vtable.ColumnVindexes[0].Type, bls.Shard)
}
filter = fmt.Sprintf("select * from %s%s", rule.Match, inKeyrange)
}
Expand Down
30 changes: 29 additions & 1 deletion go/vt/wrangler/migrater_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vschema"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
Expand Down Expand Up @@ -197,7 +198,34 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe
tme.targetKeyRanges = append(tme.targetKeyRanges, targetKeyRange)
}

vs := &vschemapb.Keyspace{Sharded: true}
vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschema.Vindex{
"thash": {
Type: "hash",
},
},
Tables: map[string]*vschema.Table{
"t1": {
ColumnVindexes: []*vschema.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
},
"t2": {
ColumnVindexes: []*vschema.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
},
"t3": {
ColumnVindexes: []*vschema.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
},
},
}
if err := tme.ts.SaveVSchema(ctx, "ks", vs); err != nil {
t.Fatal(err)
}
Expand Down
57 changes: 36 additions & 21 deletions go/vt/wrangler/stream_migrater.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
)

Expand Down Expand Up @@ -367,37 +368,38 @@ func (sm *streamMigrater) templatize(ctx context.Context, tabletStreams []*vrStr
}

func (sm *streamMigrater) templatizeRule(ctx context.Context, rule *binlogdatapb.Rule) (int, error) {
vtable, ok := sm.mi.sourceKSSchema.Tables[rule.Match]
if !ok {
return 0, fmt.Errorf("table %v not found in vschema", rule.Match)
}
if vtable.Type == vindexes.TypeReference {
return reference, nil
}
switch {
case rule.Filter == "":
return reference, nil
return unknown, fmt.Errorf("rule %v does not have a select expression in vreplication", rule)
case key.IsKeyRange(rule.Filter):
rule.Filter = "{{.}}"
return sharded, nil
case rule.Filter == vreplication.ExcludeStr:
return unknown, nil
return unknown, fmt.Errorf("unexpected rule in vreplication: %v", rule)
default:
templatized, err := sm.templatizeQuery(ctx, rule.Filter)
err := sm.templatizeKeyRange(ctx, rule)
if err != nil {
return unknown, err
}
if templatized != "" {
rule.Filter = templatized
return sharded, nil
}
return reference, nil
return sharded, nil
}
}

// templatizeQuery converts the underlying in_keyrange subexpression to
// a template to allow for new keyrange values to be substituted.
func (sm *streamMigrater) templatizeQuery(ctx context.Context, query string) (string, error) {
statement, err := sqlparser.Parse(query)
func (sm *streamMigrater) templatizeKeyRange(ctx context.Context, rule *binlogdatapb.Rule) error {
statement, err := sqlparser.Parse(rule.Filter)
if err != nil {
return "", err
return err
}
sel, ok := statement.(*sqlparser.Select)
if !ok {
return "", fmt.Errorf("unexpected query: %v", query)
return fmt.Errorf("unexpected query: %v", rule.Filter)
}
var expr sqlparser.Expr
if sel.Where != nil {
Expand All @@ -416,23 +418,36 @@ func (sm *streamMigrater) templatizeQuery(ctx context.Context, query string) (st
case 3:
krExpr = funcExpr.Exprs[2]
default:
return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
}
aliased, ok := krExpr.(*sqlparser.AliasedExpr)
if !ok {
return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
}
val, ok := aliased.Expr.(*sqlparser.SQLVal)
if !ok {
return "", fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
return fmt.Errorf("unexpected in_keyrange parameters: %v", sqlparser.String(funcExpr))
}
if strings.Contains(query, "{{") {
return "", fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", query)
if strings.Contains(rule.Filter, "{{") {
return fmt.Errorf("cannot migrate queries that contain '{{' in their string: %s", rule.Filter)
}
val.Val = []byte("{{.}}")
return sqlparser.String(statement), nil
rule.Filter = sqlparser.String(statement)
return nil
}
return "", nil
// There was no in_keyrange expression. Create a new one.
vtable := sm.mi.sourceKSSchema.Tables[rule.Match]
inkr := &sqlparser.FuncExpr{
Name: sqlparser.NewColIdent("in_keyrange"),
Exprs: sqlparser.SelectExprs{
&sqlparser.AliasedExpr{Expr: &sqlparser.ColName{Name: vtable.ColumnVindexes[0].Columns[0]}},
&sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte(vtable.ColumnVindexes[0].Type))},
&sqlparser.AliasedExpr{Expr: sqlparser.NewStrVal([]byte("{{.}}"))},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments about the use of {{.}}?

},
}
sel.AddWhere(inkr)
rule.Filter = sqlparser.String(statement)
return nil
}

func (sm *streamMigrater) createTargetStreams(ctx context.Context, tmpl []*vrStream) error {
Expand Down
Loading