From 7e7722ac7b51aa62c42b81a7e679e8acb1c15384 Mon Sep 17 00:00:00 2001 From: zhe Date: Fri, 30 Jul 2021 18:45:58 -0400 Subject: [PATCH] adding continueAfterCopyWithOwner flag Signed-off-by: zhe --- go/vt/vtctl/vtctl.go | 3 +- go/vt/wrangler/materializer.go | 8 +-- go/vt/wrangler/materializer_test.go | 88 +++++++++++++++++++++++++++-- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 73bd789f9cf..e9285f3f6bd 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -2279,6 +2279,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl //TODO: keep -cell around for backward compatibility and remove it in a future version cell := subFlags.String("cell", "", "Cell to replicate from.") tabletTypes := subFlags.String("tablet_types", "", "Source tablet types to replicate from.") + continueAfterCopyWithOwner := subFlags.Bool("continue_after_copy_with_owner", false, "Vindex will continue materialization after copy when an owner is provided") if err := subFlags.Parse(args); err != nil { return err } @@ -2293,7 +2294,7 @@ func commandCreateLookupVindex(ctx context.Context, wr *wrangler.Wrangler, subFl if err := json2.Unmarshal([]byte(subFlags.Arg(1)), specs); err != nil { return err } - return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes) + return wr.CreateLookupVindex(ctx, keyspace, specs, *cell, *tabletTypes, *continueAfterCopyWithOwner) } func commandExternalizeVindex(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error { diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0418df15a1e..94badc65fdf 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -343,8 +343,8 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi } // CreateLookupVindex creates a lookup vindex and sets up the backfill. -func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string) error { - ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs) +func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, cell, tabletTypes string, continueAfterCopyWithOwner bool) error { + ms, sourceVSchema, targetVSchema, err := wr.prepareCreateLookup(ctx, keyspace, specs, continueAfterCopyWithOwner) if err != nil { return err } @@ -364,7 +364,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace string, spe } // prepareCreateLookup performs the preparatory steps for creating a lookup vindex. -func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { +func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, specs *vschemapb.Keyspace, continueAfterCopyWithOwner bool) (ms *vtctldatapb.MaterializeSettings, sourceVSchema, targetVSchema *vschemapb.Keyspace, err error) { // Important variables are pulled out here. var ( // lookup vindex info @@ -617,7 +617,7 @@ func (wr *Wrangler) prepareCreateLookup(ctx context.Context, keyspace string, sp Workflow: targetTableName + "_vdx", SourceKeyspace: keyspace, TargetKeyspace: targetKeyspace, - StopAfterCopy: vindex.Owner != "", + StopAfterCopy: vindex.Owner != "" && !continueAfterCopyWithOwner, TableSettings: []*vtctldatapb.TableMaterializeSettings{{ TargetTable: targetTableName, SourceExpression: materializeQuery, diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 4034f92d639..720858c9348 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -317,7 +317,7 @@ func TestCreateLookupVindexFull(t *testing.T) { env.tmc.expectVRQuery(200, "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='lkp_vdx'", &sqltypes.Result{}) ctx := context.Background() - err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER") + err := env.wr.CreateLookupVindex(ctx, ms.SourceKeyspace, specs, "cell", "MASTER", false) require.NoError(t, err) wantvschema := &vschemapb.Keyspace{ @@ -566,7 +566,7 @@ func TestCreateLookupVindexCreateDDL(t *testing.T) { delete(env.tmc.schema, ms.SourceKeyspace+".t1") } - outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs) + outms, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, tcase.specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -808,7 +808,7 @@ func TestCreateLookupVindexSourceVSchema(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, tcase.out) { t.Errorf("%s: got:\n%v, want\n%v", tcase.description, got, tcase.out) @@ -1041,7 +1041,7 @@ func TestCreateLookupVindexTargetVSchema(t *testing.T) { t.Fatal(err) } - _, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + _, _, got, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) if tcase.err != "" { if err == nil || !strings.Contains(err.Error(), tcase.err) { t.Errorf("prepareCreateLookup(%s) err: %v, must contain %v", tcase.description, err, tcase.err) @@ -1156,13 +1156,89 @@ func TestCreateLookupVindexSameKeyspace(t *testing.T) { t.Fatal(err) } - _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs) + _, got, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) require.NoError(t, err) if !proto.Equal(got, want) { t.Errorf("same keyspace: got:\n%v, want\n%v", got, want) } } +func TestStopAfterCopyFlag(t *testing.T) { + ms := &vtctldatapb.MaterializeSettings{ + SourceKeyspace: "ks", + TargetKeyspace: "ks", + } + env := newTestMaterializerEnv(t, ms, []string{"0"}, []string{"0"}) + defer env.close() + specs := &vschemapb.Keyspace{ + Vindexes: map[string]*vschemapb.Vindex{ + "v": { + Type: "lookup_unique", + Params: map[string]string{ + "table": "ks.lkp", + "from": "c1", + "to": "col2", + }, + Owner: "t1", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "v", + Column: "col2", + }}, + }, + }, + } + // Dummy sourceSchema + sourceSchema := "CREATE TABLE `t1` (\n" + + " `col1` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `col2` int(11) DEFAULT NULL,\n" + + " PRIMARY KEY (`id`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=latin1" + + vschema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "hash": { + Type: "hash", + }, + }, + Tables: map[string]*vschemapb.Table{ + "t1": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Name: "hash", + Column: "col1", + }}, + }, + }, + } + env.tmc.schema[ms.SourceKeyspace+".t1"] = &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: querypb.Type_INT64, + }, { + Name: "col2", + Type: querypb.Type_INT64, + }}, + Schema: sourceSchema, + }}, + } + if err := env.topoServ.SaveVSchema(context.Background(), ms.SourceKeyspace, vschema); err != nil { + t.Fatal(err) + } + + ms1, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, false) + require.NoError(t, err) + require.Equal(t, ms1.StopAfterCopy, true) + + ms2, _, _, err := env.wr.prepareCreateLookup(context.Background(), ms.SourceKeyspace, specs, true) + require.NoError(t, err) + require.Equal(t, ms2.StopAfterCopy, false) +} + func TestCreateLookupVindexFailures(t *testing.T) { topoServ := memorytopo.NewServer("cell") wr := New(logutil.NewConsoleLogger(), topoServ, nil) @@ -1422,7 +1498,7 @@ func TestCreateLookupVindexFailures(t *testing.T) { err: "ColumnVindex for table t1 already exists: c1", }} for _, tcase := range testcases { - err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "") + err := wr.CreateLookupVindex(context.Background(), "sourceks", tcase.input, "", "", false) if !strings.Contains(err.Error(), tcase.err) { t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err) }