From b03bca5e8b365d60c87e0ca9bda717c5e3f5507b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 28 Oct 2023 14:29:19 -0400 Subject: [PATCH 1/7] Skip vindex keyrange filtering when we can Signed-off-by: Matt Lord --- go/vt/wrangler/materializer.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 0fba424eacd..850d9d6f6db 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1029,7 +1029,16 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - inserts, err := mz.generateInserts(ctx, sourceShards) + // keyRangesEqual allows us to optimize the stream for the cases where + // while the target keyspace may be sharded, the shard mapping is 1:1 + // between the source and target and the key ranges are equal. This + // can be done, for example, when doing shard by shard migrations -- + // migrating a single shard at a time. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { + streamKeyRangesEqual = true + } + inserts, err := mz.generateInserts(ctx, sourceShards, streamKeyRangesEqual) if err != nil { return nil, err } @@ -1319,7 +1328,7 @@ func stripTableConstraints(ddl string) (string, error) { return newDDL, nil } -func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") for _, sourceShard := range sourceShards { @@ -1353,7 +1362,8 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err From 6528a3a4912a213f4aa7dfbbdba186f58f1ae9eb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 28 Oct 2023 17:52:12 -0400 Subject: [PATCH 2/7] Add vtctlclient/wrangler unit test Signed-off-by: Matt Lord --- go/vt/wrangler/materializer.go | 11 +- go/vt/wrangler/materializer_env_test.go | 3 +- go/vt/wrangler/materializer_test.go | 164 ++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 6 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 850d9d6f6db..990492bd191 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1029,11 +1029,12 @@ func (wr *Wrangler) prepareMaterializerStreams(ctx context.Context, ms *vtctldat insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - // keyRangesEqual allows us to optimize the stream for the cases where - // while the target keyspace may be sharded, the shard mapping is 1:1 - // between the source and target and the key ranges are equal. This - // can be done, for example, when doing shard by shard migrations -- - // migrating a single shard at a time. + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. streamKeyRangesEqual := false if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { streamKeyRangesEqual = true diff --git a/go/vt/wrangler/materializer_env_test.go b/go/vt/wrangler/materializer_env_test.go index 6c236a038bf..b98621ffa1b 100644 --- a/go/vt/wrangler/materializer_env_test.go +++ b/go/vt/wrangler/materializer_env_test.go @@ -200,12 +200,13 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet }, } env.tablets[id] = tablet - if err := env.wr.TopoServer().InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { + if err := env.wr.ts.InitTablet(context.Background(), tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */); err != nil { panic(err) } if tabletType == topodatapb.TabletType_PRIMARY { _, err := env.wr.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true return nil }) if err != nil { diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 3984641fcf8..8e501e0c0f8 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -19,12 +19,15 @@ package wrangler import ( "context" "fmt" + "regexp" + "slices" "sort" "strings" "testing" "time" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -3510,3 +3513,164 @@ func TestAddTablesToVSchema(t *testing.T) { }) } } + +// TestKeyRangesEqualOptimization tests that we optimize the source +// filtering when there's only one source shard for the stream and +// its keyrange is equal to the target shard for the stream. This +// means that even if the target keyspace is sharded, the source +// does not need to perform the in_keyrange filtering. +func TestKeyRangesEqualOptimization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + workflow := "testwf" + sourceKs := "sourceks" + targetKs := "targetks" + table := "t1" + mzi := vtctldatapb.MaterializationIntent_MOVETABLES + tableMaterializeSettings := []*vtctldatapb.TableMaterializeSettings{ + { + TargetTable: table, + SourceExpression: fmt.Sprintf("select * from %s", table), + }, + } + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + table: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "xxhash", + }, + }, + }, + }, + } + + testCases := []struct { + name string + ms *vtctldatapb.MaterializeSettings + sourceShards []string + targetShards []string + wantBls map[string]*binlogdatapb.BinlogSource + }{ + { + name: "no in_keyrange filter -- partial, keyranges equal", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + SourceShards: []string{"-80"}, // Partial MoveTables just for this shard + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-80", // Keyranges are equal between the source and target + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs), + }, + }, + }, + }, + "80-": { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs), + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + env := newTestMaterializerEnv(t, ctx, tc.ms, tc.sourceShards, tc.targetShards) + defer env.close() + + // Target is always sharded. + err := env.wr.ts.SaveVSchema(ctx, targetKs, targetVSchema) + require.NoError(t, err, "SaveVSchema failed: %v", err) + + for _, tablet := range env.tablets { + // Queries will only be executed on primary tablets in the target keyspace. + if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{}) + // If we are doing a partial MoveTables, we will only perform the workflow + // stream creation / INSERT statment on the shard(s) we're migrating. + if len(tc.ms.SourceShards) > 0 && !slices.Contains(tc.ms.SourceShards, tablet.Shard) { + continue + } + bls := tc.wantBls[tablet.Shard] + require.NotNil(t, bls, "no binlog source defined for tablet %+v", tablet) + if bls.Filter != nil { + for i, rule := range bls.Filter.Rules { + // It's escaped in the SQL statement. + bls.Filter.Rules[i].Filter = strings.ReplaceAll(rule.Filter, `'`, `\'`) + } + } + blsBytes, err := prototext.Marshal(bls) + require.NoError(t, err, "failed to marshal binlog source: %v", err) + // This is also escaped in the SQL statement. + blsStr := strings.ReplaceAll(string(blsBytes), `"`, `\"`) + // Escape the string for the regexp comparison. + blsStr = regexp.QuoteMeta(blsStr) + // For some reason we end up with an extra slash added by QuoteMeta for the + // escaped single quotes in the filter. + blsStr = strings.ReplaceAll(blsStr, `\\\\`, `\\\`) + expectedQuery := fmt.Sprintf(`/insert into _vt.vreplication.* values \('%s', '%s'`, workflow, blsStr) + env.tmc.expectVRQuery(int(tablet.Alias.Uid), expectedQuery, &sqltypes.Result{}) + } + + _, err = env.wr.prepareMaterializerStreams(ctx, tc.ms) + require.NoError(t, err, "prepareMaterializerStreams failed: %v", err) + }) + } +} From 8d19288e0475af194878f96fb8a01fe329d47300 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 29 Oct 2023 13:13:07 -0400 Subject: [PATCH 3/7] vtctldclient implementation Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer.go | 32 ++- go/vt/vtctl/workflow/materializer_env_test.go | 35 ++- go/vt/vtctl/workflow/materializer_test.go | 240 ++++++++++++++++++ 3 files changed, 293 insertions(+), 14 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index 152409540c8..6be5ac7f445 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -103,7 +103,17 @@ func (mz *materializer) createMoveTablesStreams(req *vtctldatapb.MoveTablesCreat } sourceShards := mz.filterSourceShards(target) - blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards) + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, target.KeyRange) { + streamKeyRangesEqual = true + } + blses, err := mz.generateBinlogSources(mz.ctx, target, sourceShards, streamKeyRangesEqual) if err != nil { return err } @@ -139,7 +149,17 @@ func (mz *materializer) createMaterializerStreams() error { insertMap := make(map[string]string, len(mz.targetShards)) for _, targetShard := range mz.targetShards { sourceShards := mz.filterSourceShards(targetShard) - inserts, err := mz.generateInserts(mz.ctx, sourceShards) + // streamKeyRangesEqual allows us to optimize the stream for the cases + // where while the target keyspace may be sharded, the target shard has + // a single source shard to stream data from and the target and source + // shard have equal key ranges. This can be done, for example, when doing + // shard by shard migrations -- migrating a single shard at a time between + // sharded source and sharded target keyspaces. + streamKeyRangesEqual := false + if len(sourceShards) == 1 && key.KeyRangeEqual(sourceShards[0].KeyRange, targetShard.KeyRange) { + streamKeyRangesEqual = true + } + inserts, err := mz.generateInserts(mz.ctx, sourceShards, streamKeyRangesEqual) if err != nil { return err } @@ -151,7 +171,7 @@ func (mz *materializer) createMaterializerStreams() error { return nil } -func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo) (string, error) { +func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*topo.ShardInfo, keyRangesEqual bool) (string, error) { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, "{{.dbname}}") for _, sourceShard := range sourceShards { @@ -185,7 +205,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return "", fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return "", err @@ -251,7 +271,7 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top return ig.String(), nil } -func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo) ([]*binlogdatapb.BinlogSource, error) { +func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard *topo.ShardInfo, sourceShards []*topo.ShardInfo, keyRangesEqual bool) ([]*binlogdatapb.BinlogSource, error) { blses := make([]*binlogdatapb.BinlogSource, 0, len(mz.sourceShards)) for _, sourceShard := range sourceShards { bls := &binlogdatapb.BinlogSource{ @@ -284,7 +304,7 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard * return nil, fmt.Errorf("unrecognized statement: %s", ts.SourceExpression) } filter := ts.SourceExpression - if mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { + if !keyRangesEqual && mz.targetVSchema.Keyspace.Sharded && mz.targetVSchema.Tables[ts.TargetTable].Type != vindexes.TypeReference { cv, err := vindexes.FindBestColVindex(mz.targetVSchema.Tables[ts.TargetTable]) if err != nil { return nil, err diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f1ddf6be645..9fa9d049b5f 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -26,19 +26,23 @@ import ( "sync" "testing" - _flag "vitess.io/vitess/go/internal/flag" + "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tmclient" + _flag "vitess.io/vitess/go/internal/flag" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type queryResult struct { @@ -154,6 +158,7 @@ func (env *testMaterializerEnv) addTablet(id int, keyspace, shard string, tablet if tabletType == topodatapb.TabletType_PRIMARY { _, err := env.ws.ts.UpdateShardFields(context.Background(), keyspace, shard, func(si *topo.ShardInfo) error { si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = true return nil }) if err != nil { @@ -175,10 +180,11 @@ type testMaterializerTMClient struct { tmclient.TabletManagerClient schema map[string]*tabletmanagerdatapb.SchemaDefinition - mu sync.Mutex - vrQueries map[int][]*queryResult - getSchemaCounts map[string]int - muSchemaCount sync.Mutex + mu sync.Mutex + vrQueries map[int][]*queryResult + createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + getSchemaCounts map[string]int + muSchemaCount sync.Mutex // Used to confirm the number of times WorkflowDelete was called. workflowDeleteCalls int @@ -186,9 +192,10 @@ type testMaterializerTMClient struct { func newTestMaterializerTMClient() *testMaterializerTMClient { return &testMaterializerTMClient{ - schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), - vrQueries: make(map[int][]*queryResult), - getSchemaCounts: make(map[string]int), + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + getSchemaCounts: make(map[string]int), } } @@ -205,6 +212,11 @@ func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) { } func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, request) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) + } + } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil } @@ -288,6 +300,13 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } +func (tmc *testMaterializerTMClient) expectVReplicationWorkflowRequests(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.createVReplicationWorkflowRequests[tabletID] = req +} + func (tmc *testMaterializerTMClient) verifyQueries(t *testing.T) { t.Helper() tmc.mu.Lock() diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 5121590e3c4..035451674a2 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -19,10 +19,13 @@ package workflow import ( "context" "fmt" + "slices" "strings" "testing" + "time" "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -3284,3 +3287,240 @@ func TestMaterializerNoVindexInExpression(t *testing.T) { err := env.ws.Materialize(ctx, ms) require.EqualError(t, err, "could not find vindex column c1") } + +// TestKeyRangesEqualOptimization tests that we optimize the source +// filtering when there's only one source shard for the stream and +// its keyrange is equal to the target shard for the stream. This +// means that even if the target keyspace is sharded, the source +// does not need to perform the in_keyrange filtering. +func TestKeyRangesEqualOptimization(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + workflow := "testwf" + cells := []string{"cell"} + sourceKs := "sourceks" + targetKs := "targetks" + table := "t1" + tableSettings := []*vtctldatapb.TableMaterializeSettings{{ + TargetTable: table, + SourceExpression: fmt.Sprintf("select * from %s", table), + }} + targetVSchema := &vschemapb.Keyspace{ + Sharded: true, + Vindexes: map[string]*vschemapb.Vindex{ + "xxhash": { + Type: "xxhash", + }, + }, + Tables: map[string]*vschemapb.Table{ + table: { + ColumnVindexes: []*vschemapb.ColumnVindex{ + { + Column: "id", + Name: "xxhash", + }, + }, + }, + }, + } + + testCases := []struct { + name string + sourceShards []string + targetShards []string + moveTablesReq *vtctldatapb.MoveTablesCreateRequest + // Target Shards are in the order specifed in the targetShards slice + // with the UIDs starting at 200 and increasing by 10 for each tablet + // and shard since there's only a primary tablet per shard. + wantReqs map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + }{ + { + name: "no in_keyrange filter -- partial, one equal shard", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + SourceShards: []string{"-80"}, // Partial MoveTables just for this shard + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + WorkflowSubType: binlogdatapb.VReplicationWorkflowSubType_Partial, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", // Keyranges are equal between the source and target + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "in_keyrange filter -- unequal shards", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-80')", table, targetKs), + }, + }, + }, + }, + }, + }, + 210: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '80-')", table, targetKs), + }, + }, + }, + }, + }, + }, + }, + }, + { + name: "no in_keyrange filter -- all equal shards", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + 210: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if len(tc.wantReqs) == 0 { + require.FailNow(t, "invalid test case", "no wanted requests specified") + } + workflowType := maps.Values(tc.wantReqs)[0].WorkflowType + ms := &vtctldatapb.MaterializeSettings{ + Workflow: tc.moveTablesReq.Workflow, + MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES, + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Cell: strings.Join(tc.moveTablesReq.Cells, ","), + SourceShards: tc.moveTablesReq.SourceShards, + TableSettings: tableSettings, + } + env := newTestMaterializerEnv(t, ctx, ms, tc.sourceShards, tc.targetShards) + defer env.close() + + // Target is always sharded. + err := env.ws.ts.SaveVSchema(ctx, targetKs, targetVSchema) + require.NoError(t, err, "SaveVSchema failed: %v", err) + + for _, tablet := range env.tablets { + // Queries will only be executed on primary tablets in the target keyspace. + if tablet.Keyspace != targetKs || tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + env.tmc.expectVRQuery(int(tablet.Alias.Uid), mzSelectFrozenQuery, &sqltypes.Result{}) + // If we are doing a partial MoveTables, we will only perform the workflow + // stream creation / INSERT statment on the shard(s) we're migrating. + if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { + continue + } + env.tmc.expectVReplicationWorkflowRequests(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + } + + mz := &materializer{ + ctx: ctx, + ts: env.ws.ts, + sourceTs: env.ws.ts, + tmc: env.tmc, + ms: ms, + workflowType: workflowType, + } + err = mz.createMoveTablesStreams(tc.moveTablesReq) + require.NoError(t, err, "prepareMaterializerStreams failed: %v", err) + }) + } +} From 42da9501409a1f8e5f069b561c0824f252e44a84 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 29 Oct 2023 15:07:33 -0400 Subject: [PATCH 4/7] Add same additional test case in wrangler Signed-off-by: Matt Lord --- go/vt/wrangler/materializer_test.go | 41 ++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index 8e501e0c0f8..f68847d8a74 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -3560,7 +3560,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) { wantBls map[string]*binlogdatapb.BinlogSource }{ { - name: "no in_keyrange filter -- partial, keyranges equal", + name: "no in_keyrange filter -- partial, one equal shard", ms: &vtctldatapb.MaterializeSettings{ MaterializationIntent: mzi, Workflow: workflow, @@ -3626,6 +3626,45 @@ func TestKeyRangesEqualOptimization(t *testing.T) { }, }, }, + { + name: "no in_keyrange filter -- all equal shards", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-80", "80-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-80": { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + "80-": { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + }, + }, + }, + }, + }, + }, } for _, tc := range testCases { From 19a22d0f2b5632ff84f1e441d4466a3611ef7459 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 29 Oct 2023 15:43:06 -0400 Subject: [PATCH 5/7] Add a shard consolidation/merge test case Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_test.go | 45 +++++++++++++++++++++++ go/vt/wrangler/materializer_test.go | 27 ++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 035451674a2..ebec564a139 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -3421,6 +3421,51 @@ func TestKeyRangesEqualOptimization(t *testing.T) { }, }, }, + { + name: "in_keyrange filter -- unequal shards on merge", + moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cells: []string{"cell"}, + IncludeTables: []string{table}, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-"}, + wantReqs: map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest{ + 200: { + Workflow: workflow, + WorkflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + Cells: cells, + BinlogSource: []*binlogdatapb.BinlogSource{ + { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + { + Keyspace: sourceKs, + Shard: "80-", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + }, + }, + }, + }, { name: "no in_keyrange filter -- all equal shards", moveTablesReq: &vtctldatapb.MoveTablesCreateRequest{ diff --git a/go/vt/wrangler/materializer_test.go b/go/vt/wrangler/materializer_test.go index f68847d8a74..91dfff80d11 100644 --- a/go/vt/wrangler/materializer_test.go +++ b/go/vt/wrangler/materializer_test.go @@ -3626,6 +3626,33 @@ func TestKeyRangesEqualOptimization(t *testing.T) { }, }, }, + { + name: "in_keyrange filter -- unequal shards on merge", + ms: &vtctldatapb.MaterializeSettings{ + MaterializationIntent: mzi, + Workflow: workflow, + TargetKeyspace: targetKs, + SourceKeyspace: sourceKs, + Cell: "cell", + TableSettings: tableMaterializeSettings, + }, + sourceShards: []string{"-80", "80-"}, + targetShards: []string{"-"}, + wantBls: map[string]*binlogdatapb.BinlogSource{ + "-": { + Keyspace: sourceKs, + Shard: "-80", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: table, + Filter: fmt.Sprintf("select * from %s where in_keyrange(id, '%s.xxhash', '-')", table, targetKs), + }, + }, + }, + }, + }, + }, { name: "no in_keyrange filter -- all equal shards", ms: &vtctldatapb.MaterializeSettings{ From cf71f925fe50ea250ed0f7e8cfa7cb98feb8c7f3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sun, 29 Oct 2023 21:26:59 -0400 Subject: [PATCH 6/7] Improve name of test func Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_env_test.go | 2 +- go/vt/vtctl/workflow/materializer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index 9fa9d049b5f..cb0da07228c 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -300,7 +300,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r }) } -func (tmc *testMaterializerTMClient) expectVReplicationWorkflowRequests(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { +func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { tmc.mu.Lock() defer tmc.mu.Unlock() diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index ebec564a139..c48dfec1bc9 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -3553,7 +3553,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) { if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) { continue } - env.tmc.expectVReplicationWorkflowRequests(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) + env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid]) } mz := &materializer{ From 6eab98158326b85756a61022d438c0f517ebf930 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 30 Oct 2023 08:39:23 -0400 Subject: [PATCH 7/7] Two minor error message tweaks Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/materializer_env_test.go | 2 +- go/vt/vtctl/workflow/materializer_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index cb0da07228c..1026628405e 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -214,7 +214,7 @@ func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) { func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { if !proto.Equal(expect, request) { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect) } } res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index c48dfec1bc9..9b5c7c5c1cd 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -3565,7 +3565,7 @@ func TestKeyRangesEqualOptimization(t *testing.T) { workflowType: workflowType, } err = mz.createMoveTablesStreams(tc.moveTablesReq) - require.NoError(t, err, "prepareMaterializerStreams failed: %v", err) + require.NoError(t, err, "createMoveTablesStreams failed: %v", err) }) } }