diff --git a/go/vt/topo/cell_info.go b/go/vt/topo/cell_info.go index ca700890b57..ba59b656067 100644 --- a/go/vt/topo/cell_info.go +++ b/go/vt/topo/cell_info.go @@ -18,6 +18,7 @@ package topo import ( "path" + "strings" "context" @@ -171,3 +172,41 @@ func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) { } return DirEntriesToStringArray(entries), nil } + +// ExpandCells takes a comma-separated list of cells and returns an array of cell names +// Aliases are expanded and an empty string returns all cells +func (ts *Server) ExpandCells(ctx context.Context, cells string) ([]string, error) { + var err error + var outputCells []string + inputCells := strings.Split(cells, ",") + if cells == "" { + inputCells, err = ts.GetCellInfoNames(ctx) + if err != nil { + return nil, err + } + } + + for _, cell := range inputCells { + cell2 := strings.TrimSpace(cell) + shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) + defer cancel() + _, err := ts.GetCellInfo(shortCtx, cell2, false) + if err != nil { + // not a valid cell, check whether it is a cell alias + shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout) + defer cancel() + alias, err2 := ts.GetCellsAlias(shortCtx, cell2, false) + // if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue + if err2 == nil { + outputCells = append(outputCells, alias.Cells...) + } + if err != nil { + return nil, err + } + } else { + // valid cell, add it to our list + outputCells = append(outputCells, cell2) + } + } + return outputCells, nil +} diff --git a/go/vt/topo/srv_keyspace.go b/go/vt/topo/srv_keyspace.go index 3b0fa54141f..d98869eb1d7 100644 --- a/go/vt/topo/srv_keyspace.go +++ b/go/vt/topo/srv_keyspace.go @@ -701,3 +701,22 @@ func ShardIsServing(srvKeyspace *topodatapb.SrvKeyspace, shard *topodatapb.Shard } return false } + +// ValidateSrvKeyspace validates that the SrvKeyspace for given keyspace in the provided cells is not corrupted +func (ts *Server) ValidateSrvKeyspace(ctx context.Context, keyspace, cells string) error { + cellsToValidate, err := ts.ExpandCells(ctx, cells) + if err != nil { + return err + } + for _, cell := range cellsToValidate { + srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace) + if err != nil { + return err + } + err = OrderAndCheckPartitions(cell, srvKeyspace) + if err != nil { + return err + } + } + return nil +} diff --git a/go/vt/topo/topotests/cell_info_test.go b/go/vt/topo/topotests/cell_info_test.go index 8d6ea160109..7854763a683 100644 --- a/go/vt/topo/topotests/cell_info_test.go +++ b/go/vt/topo/topotests/cell_info_test.go @@ -18,8 +18,11 @@ package topotests import ( "fmt" + "strings" "testing" + "github.com/stretchr/testify/require" + "context" "vitess.io/vitess/go/vt/topo" @@ -44,6 +47,11 @@ func TestCellInfo(t *testing.T) { t.Fatalf("unexpected CellInfo: %v", ci) } + var cells []string + cells, err = ts.ExpandCells(ctx, cell) + require.NoError(t, err) + require.EqualValues(t, []string{"cell1"}, cells) + // Update the Server Address. if err := ts.UpdateCellInfoFields(ctx, cell, func(ci *topodatapb.CellInfo) error { ci.ServerAddress = "new address" @@ -124,3 +132,48 @@ func TestCellInfo(t *testing.T) { t.Fatalf("GetCellInfo(non-existing cell) failed: %v", err) } } + +func TestExpandCells(t *testing.T) { + ctx := context.Background() + var cells []string + var err error + var allCells = "cell1,cell2,cell3" + type testCase struct { + name string + cellsIn string + cellsOut []string + errString string + } + + testCases := []testCase{ + {"single", "cell1", []string{"cell1"}, ""}, + {"multiple", "cell1,cell2,cell3", []string{"cell1", "cell2", "cell3"}, ""}, + {"empty", "", []string{"cell1", "cell2", "cell3"}, ""}, + {"bad", "unknown", nil, "node doesn't exist"}, + } + + for _, tCase := range testCases { + t.Run(tCase.name, func(t *testing.T) { + cellsIn := tCase.cellsIn + if cellsIn == "" { + cellsIn = allCells + } + topoCells := strings.Split(cellsIn, ",") + var ts *topo.Server + if tCase.name == "bad" { + ts = memorytopo.NewServer() + } else { + ts = memorytopo.NewServer(topoCells...) + } + cells, err = ts.ExpandCells(ctx, cellsIn) + if tCase.errString != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tCase.errString) + } else { + require.NoError(t, err) + } + require.EqualValues(t, tCase.cellsOut, cells) + }) + } + +} diff --git a/go/vt/topo/topotests/srv_keyspace_test.go b/go/vt/topo/topotests/srv_keyspace_test.go index 40d418e63dc..149cc14c54d 100644 --- a/go/vt/topo/topotests/srv_keyspace_test.go +++ b/go/vt/topo/topotests/srv_keyspace_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "context" "github.com/golang/protobuf/proto" @@ -1170,3 +1172,71 @@ func TestMasterMigrateServedType(t *testing.T) { t.Errorf("MigrateServedType() failure. Got %v, want: %v", string(got), string(want)) } } + +func TestValidateSrvKeyspace(t *testing.T) { + cell := "cell1" + cell2 := "cell2" + keyspace := "ks1" + ctx := context.Background() + ts := memorytopo.NewServer(cell, cell2) + + leftKeyRange, err := key.ParseShardingSpec("-80") + if err != nil || len(leftKeyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(leftKeyRange)) + } + + rightKeyRange, err := key.ParseShardingSpec("80-") + if err != nil || len(rightKeyRange) != 1 { + t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(rightKeyRange)) + } + + correct := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_MASTER, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: "-80", + KeyRange: leftKeyRange[0], + }, + { + Name: "80-", + KeyRange: rightKeyRange[0], + }, + }, + }, + }, + } + + incorrect := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{ + { + ServedType: topodatapb.TabletType_MASTER, + ShardReferences: []*topodatapb.ShardReference{ + { + Name: "80-", + KeyRange: rightKeyRange[0], + }, + }, + }, + }, + } + + if err := ts.UpdateSrvKeyspace(ctx, cell, keyspace, correct); err != nil { + t.Fatalf("UpdateSrvKeyspace() failed: %v", err) + } + + if err := ts.UpdateSrvKeyspace(ctx, cell2, keyspace, incorrect); err != nil { + t.Fatalf("UpdateSrvKeyspace() failed: %v", err) + } + errMsg := "keyspace partition for MASTER in cell cell2 does not start with min key" + err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1,cell2") + require.EqualError(t, err, errMsg) + + err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1") + require.NoError(t, err) + err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell2") + require.EqualError(t, err, errMsg) + err = ts.ValidateSrvKeyspace(ctx, keyspace, "") + require.EqualError(t, err, errMsg) +} diff --git a/go/vt/wrangler/resharder.go b/go/vt/wrangler/resharder.go index 3a12aa9c1cd..27de39c859b 100644 --- a/go/vt/wrangler/resharder.go +++ b/go/vt/wrangler/resharder.go @@ -17,11 +17,12 @@ limitations under the License. package wrangler import ( + "context" "fmt" "sync" "time" - "context" + "vitess.io/vitess/go/vt/log" "github.com/golang/protobuf/proto" "github.com/pkg/errors" @@ -67,6 +68,12 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil { return err } + if err := wr.ts.ValidateSrvKeyspace(ctx, keyspace, cell); err != nil { + err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt in cell %s", keyspace, cell) + log.Errorf("%w", err2) + return err2 + } + rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes) if err != nil { return vterrors.Wrap(err, "buildResharder") diff --git a/go/vt/wrangler/resharder_env_test.go b/go/vt/wrangler/resharder_env_test.go index d3aad21f1d1..ed0c20ab638 100644 --- a/go/vt/wrangler/resharder_env_test.go +++ b/go/vt/wrangler/resharder_env_test.go @@ -24,6 +24,10 @@ import ( "sync" "testing" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/key" + "context" "vitess.io/vitess/go/sqltypes" @@ -55,7 +59,36 @@ var ( //---------------------------------------------- // testResharderEnv -func newTestResharderEnv(sources, targets []string) *testResharderEnv { +func getPartition(t *testing.T, shards []string) *topodatapb.SrvKeyspace_KeyspacePartition { + partition := &topodatapb.SrvKeyspace_KeyspacePartition{ + ServedType: topodatapb.TabletType_MASTER, + ShardReferences: []*topodatapb.ShardReference{}, + } + for _, shard := range shards { + keyRange, err := key.ParseShardingSpec(shard) + require.NoError(t, err) + require.Equal(t, 1, len(keyRange)) + partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{ + Name: shard, + KeyRange: keyRange[0], + }) + } + return partition +} +func initTopo(t *testing.T, topo *topo.Server, keyspace string, sources, targets, cells []string) { + ctx := context.Background() + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{}, + } + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, sources)) + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, targets)) + for _, cell := range cells { + topo.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace) + } + topo.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")) +} + +func newTestResharderEnv(t *testing.T, sources, targets []string) *testResharderEnv { env := &testResharderEnv{ keyspace: "ks", workflow: "resharderTest", @@ -67,7 +100,7 @@ func newTestResharderEnv(sources, targets []string) *testResharderEnv { tmc: newTestResharderTMClient(), } env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc) - + initTopo(t, env.topoServ, "ks", sources, targets, []string{"cell"}) tabletID := 100 for _, shard := range sources { _ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER) diff --git a/go/vt/wrangler/resharder_test.go b/go/vt/wrangler/resharder_test.go index 8fea9146172..7b7199fee19 100644 --- a/go/vt/wrangler/resharder_test.go +++ b/go/vt/wrangler/resharder_test.go @@ -39,7 +39,7 @@ const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_ const eol = "$" func TestResharderOneToMany(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -73,7 +73,7 @@ func TestResharderOneToMany(t *testing.T) { testCases = append(testCases, newTestCase("", "replica,rdonly")) for _, tc := range testCases { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) schm := &tabletmanagerdatapb.SchemaDefinition{ TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{ @@ -115,7 +115,7 @@ func TestResharderOneToMany(t *testing.T) { } func TestResharderManyToOne(t *testing.T) { - env := newTestResharderEnv([]string{"-80", "80-"}, []string{"0"}) + env := newTestResharderEnv(t, []string{"-80", "80-"}, []string{"0"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -148,7 +148,7 @@ func TestResharderManyToOne(t *testing.T) { } func TestResharderManyToMany(t *testing.T) { - env := newTestResharderEnv([]string{"-40", "40-"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"-40", "40-"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -191,7 +191,7 @@ func TestResharderManyToMany(t *testing.T) { // TestResharderOneRefTable tests the case where there's one ref table, but no stream for it. // This means that the table is being updated manually. func TestResharderOneRefTable(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -243,7 +243,7 @@ func TestResharderOneRefTable(t *testing.T) { // TestReshardStopFlags tests the flags -stop_started and -stop_after_copy func TestReshardStopFlags(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -294,7 +294,7 @@ func TestReshardStopFlags(t *testing.T) { // TestResharderOneRefStream tests the case where there's one ref table and an associated stream. func TestResharderOneRefStream(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -362,7 +362,7 @@ func TestResharderOneRefStream(t *testing.T) { // TestResharderNoRefStream tests the case where there's a stream, but it's not a reference. func TestResharderNoRefStream(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -438,7 +438,7 @@ func TestResharderNoRefStream(t *testing.T) { } func TestResharderCopySchema(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -482,7 +482,7 @@ func TestResharderCopySchema(t *testing.T) { } func TestResharderDupWorkflow(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -513,7 +513,7 @@ func TestResharderDupWorkflow(t *testing.T) { } func TestResharderServingState(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -555,7 +555,7 @@ func TestResharderServingState(t *testing.T) { } func TestResharderTargetAlreadyResharding(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -588,7 +588,7 @@ func TestResharderTargetAlreadyResharding(t *testing.T) { } func TestResharderUnnamedStream(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -636,7 +636,7 @@ func TestResharderUnnamedStream(t *testing.T) { } func TestResharderMismatchedRefStreams(t *testing.T) { - env := newTestResharderEnv([]string{"-80", "80-"}, []string{"0"}) + env := newTestResharderEnv(t, []string{"-80", "80-"}, []string{"0"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -703,7 +703,7 @@ func TestResharderMismatchedRefStreams(t *testing.T) { } func TestResharderTableNotInVSchema(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -740,7 +740,7 @@ func TestResharderTableNotInVSchema(t *testing.T) { } func TestResharderMixedTablesOrder1(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ @@ -807,7 +807,7 @@ func TestResharderMixedTablesOrder1(t *testing.T) { } func TestResharderMixedTablesOrder2(t *testing.T) { - env := newTestResharderEnv([]string{"0"}, []string{"-80", "80-"}) + env := newTestResharderEnv(t, []string{"0"}, []string{"-80", "80-"}) defer env.close() schm := &tabletmanagerdatapb.SchemaDefinition{ diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 6e2960a17bd..461311ac1bd 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -465,11 +465,19 @@ func (wr *Wrangler) SwitchReads(ctx context.Context, targetKeyspace, workflow st } return sw.logs(), nil } - wr.Logger().Infof("switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction) + wr.Logger().Infof("About to switchShardReads: %+v, %+v, %+v", cells, servedTypes, direction) if err := ts.switchShardReads(ctx, cells, servedTypes, direction); err != nil { ts.wr.Logger().Errorf("switchShardReads failed: %v", err) return nil, err } + + wr.Logger().Infof("switchShardReads Completed: %+v, %+v, %+v", cells, servedTypes, direction) + if err := wr.ts.ValidateSrvKeyspace(ctx, targetKeyspace, strings.Join(cells, ",")); err != nil { + err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", + targetKeyspace, strings.Join(cells, ",")) + log.Errorf("%w", err2) + return nil, err2 + } return sw.logs(), nil } @@ -1020,6 +1028,12 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } else { fromShards, toShards = ts.targetShards(), ts.sourceShards() } + if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", + ts.targetKeyspace, strings.Join(cells, ",")) + log.Errorf("%w", err2) + return err2 + } for _, servedType := range servedTypes { if err := ts.wr.updateShardRecords(ctx, ts.sourceKeyspace, fromShards, cells, servedType, true /* isFrom */, false /* clearSourceShards */); err != nil { return err @@ -1032,6 +1046,12 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return err } } + if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + err2 := vterrors.Wrapf(err, "After switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", + ts.targetKeyspace, strings.Join(cells, ",")) + log.Errorf("%w", err2) + return err2 + } return nil } @@ -1374,6 +1394,11 @@ func (ts *trafficSwitcher) changeWriteRoute(ctx context.Context) error { } func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { + if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, ""); err != nil { + err2 := vterrors.Wrapf(err, "Before changing shard routes, found SrvKeyspace for %s is corrupt", ts.targetKeyspace) + log.Errorf("%w", err2) + return err2 + } err := ts.forAllSources(func(source *tsSource) error { _, err := ts.wr.ts.UpdateShardFields(ctx, ts.sourceKeyspace, source.si.ShardName(), func(si *topo.ShardInfo) error { si.IsMasterServing = false @@ -1394,7 +1419,16 @@ func (ts *trafficSwitcher) changeShardRouting(ctx context.Context) error { if err != nil { return err } - return ts.wr.ts.MigrateServedType(ctx, ts.targetKeyspace, ts.targetShards(), ts.sourceShards(), topodatapb.TabletType_MASTER, nil) + err = ts.wr.ts.MigrateServedType(ctx, ts.targetKeyspace, ts.targetShards(), ts.sourceShards(), topodatapb.TabletType_MASTER, nil) + if err != nil { + return err + } + if err := ts.wr.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, ""); err != nil { + err2 := vterrors.Wrapf(err, "After changing shard routes, found SrvKeyspace for %s is corrupt", ts.targetKeyspace) + log.Errorf("%w", err2) + return err2 + } + return nil } func (ts *trafficSwitcher) startReverseVReplication(ctx context.Context) error {