Skip to content

Commit

Permalink
adding continueAfterCopyWithOwner flag
Browse files Browse the repository at this point in the history
Signed-off-by: zhe <[email protected]>
  • Loading branch information
zhePin committed Jul 30, 2021
1 parent 1aeb87c commit 7e7722a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 11 deletions.
3 changes: 2 additions & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl
//TODO: keep -cell around for backward compatibility and remove it in a future version
cell := subFlags.String("cell", "", "Cell to replicate from.")
tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.")
continueAfterCopyWithOwner := subFlags.Bool("continue_after_copy_with_owner", false, "Vindex will continue materialization after copy when an owner is provided")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2293,7 +2294,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl
if err := json2.Unmarshal([]byte(subFlags.Arg(1)), specs); err != nil {
return err
}
return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes)
return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes, *continueAfterCopyWithOwner)
}

func commandExternalizeVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi
}

// CreateLookupVindex creates a lookup vindex and sets up the backfill.
func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string) error {
ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs)
func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string, continueAfterCopyWithOwner bool) error {
ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs, continueAfterCopyWithOwner)
if err != nil {
return err
}
Expand All @@ -364,7 +364,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe
}

// prepareCreateLookup performs the preparatory steps for creating a lookup vindex.
func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) {
func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) {
// Important variables are pulled out here.
var (
// lookup vindex info
Expand Down Expand Up @@ -617,7 +617,7 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp
Workflow: targetTableName + "_vdx",
SourceKeyspace: keyspace,
TargetKeyspace: targetKeyspace,
StopAfterCopy: vindex.Owner != "",
StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner,
TableSettings: []*vtctldatapb.TableMaterializeSettings{{
TargetTable: targetTableName,
SourceExpression: materializeQuery,
Expand Down
88 changes: 82 additions & 6 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func TestCreateLookupVindexFull(t *testing.T) {
env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='lkp_vdx'", &sqltypes.Result{})

ctx := context.Background()
err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER")
err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER", false)
require.NoError(t, err)

wantvschema := &vschemapb.Keyspace{
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) {
delete(env.tmc.schema, ms.SourceKeyspace+".t1")
}

outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs)
outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -808,7 +808,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) {
t.Fatal(err)
}

_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, tcase.out) {
t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out)
Expand Down Expand Up @@ -1041,7 +1041,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) {
t.Fatal(err)
}

_, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
if tcase.err != "" {
if err == nil || !strings.Contains(err.Error(), tcase.err) {
t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
Expand Down Expand Up @@ -1156,13 +1156,89 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) {
t.Fatal(err)
}

_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs)
_, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
if !proto.Equal(got, want) {
t.Errorf("same keyspace: got:\n%v, want\n%v", got, want)
}
}

func TestStopAfterCopyFlag(t *testing.T) {
ms := &vtctldatapb.MaterializeSettings{
SourceKeyspace: "ks",
TargetKeyspace: "ks",
}
env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"})
defer env.close()
specs := &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "ks.lkp",
"from": "c1",
"to": "col2",
},
Owner: "t1",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Column: "col2",
}},
},
},
}
// Dummy sourceSchema
sourceSchema := "CREATE TABLE `t1` (\n" +
" `col1` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `col2` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1"

vschema := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschemapb.Vindex{
"hash": {
Type: "hash",
},
},
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "hash",
Column: "col1",
}},
},
},
}
env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Fields: []*querypb.Field{{
Name: "col1",
Type: querypb.Type_INT64,
}, {
Name: "col2",
Type: querypb.Type_INT64,
}},
Schema: sourceSchema,
}},
}
if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil {
t.Fatal(err)
}

ms1, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false)
require.NoError(t, err)
require.Equal(t, ms1.StopAfterCopy, true)

ms2, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, true)
require.NoError(t, err)
require.Equal(t, ms2.StopAfterCopy, false)
}

func TestCreateLookupVindexFailures(t *testing.T) {
topoServ := memorytopo.NewServer("cell")
wr := New(logutil.NewConsoleLogger(), topoServ, nil)
Expand Down Expand Up @@ -1422,7 +1498,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
err: "ColumnVindex for table t1 already exists: c1",
}}
for _, tcase := range testcases {
err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "")
err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "", false)
if !strings.Contains(err.Error(), tcase.err) {
t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
}
Expand Down

0 comments on commit 7e7722a

Please sign in to comment.