Skip to content

Commit

Permalink
Merge pull request #7481 from planetscale/rn-reshard-validate-srvkeys…
Browse files Browse the repository at this point in the history
…pace

Validate SrvKeyspace during Reshard/SwitchReads
  • Loading branch information
rohit-nayak-ps authored Feb 19, 2021
2 parents 83b7408 + 962a33d commit 234a412
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 22 deletions.
39 changes: 39 additions & 0 deletions go/vt/topo/cell_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package topo

import (
"path"
"strings"

"context"

Expand Down Expand Up @@ -171,3 +172,41 @@ func (ts *Server) GetKnownCells(ctx context.Context) ([]string, error) {
}
return DirEntriesToStringArray(entries), nil
}

// ExpandCells takes a comma-separated list of cells and returns an array of cell names
// Aliases are expanded and an empty string returns all cells
func (ts *Server) ExpandCells(ctx context.Context, cells string) ([]string, error) {
var err error
var outputCells []string
inputCells := strings.Split(cells, ",")
if cells == "" {
inputCells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
}

for _, cell := range inputCells {
cell2 := strings.TrimSpace(cell)
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
defer cancel()
_, err := ts.GetCellInfo(shortCtx, cell2, false)
if err != nil {
// not a valid cell, check whether it is a cell alias
shortCtx, cancel := context.WithTimeout(ctx, *RemoteOperationTimeout)
defer cancel()
alias, err2 := ts.GetCellsAlias(shortCtx, cell2, false)
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
if err2 == nil {
outputCells = append(outputCells, alias.Cells...)
}
if err != nil {
return nil, err
}
} else {
// valid cell, add it to our list
outputCells = append(outputCells, cell2)
}
}
return outputCells, nil
}
19 changes: 19 additions & 0 deletions go/vt/topo/srv_keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,22 @@ func ShardIsServing(srvKeyspace *topodatapb.SrvKeyspace, shard *topodatapb.Shard
}
return false
}

// ValidateSrvKeyspace validates that the SrvKeyspace for given keyspace in the provided cells is not corrupted
func (ts *Server) ValidateSrvKeyspace(ctx context.Context, keyspace, cells string) error {
cellsToValidate, err := ts.ExpandCells(ctx, cells)
if err != nil {
return err
}
for _, cell := range cellsToValidate {
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
if err != nil {
return err
}
err = OrderAndCheckPartitions(cell, srvKeyspace)
if err != nil {
return err
}
}
return nil
}
53 changes: 53 additions & 0 deletions go/vt/topo/topotests/cell_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package topotests

import (
"fmt"
"strings"
"testing"

"github.com/stretchr/testify/require"

"context"

"vitess.io/vitess/go/vt/topo"
Expand All @@ -44,6 +47,11 @@ func TestCellInfo(t *testing.T) {
t.Fatalf("unexpected CellInfo: %v", ci)
}

var cells []string
cells, err = ts.ExpandCells(ctx, cell)
require.NoError(t, err)
require.EqualValues(t, []string{"cell1"}, cells)

// Update the Server Address.
if err := ts.UpdateCellInfoFields(ctx, cell, func(ci *topodatapb.CellInfo) error {
ci.ServerAddress = "new address"
Expand Down Expand Up @@ -124,3 +132,48 @@ func TestCellInfo(t *testing.T) {
t.Fatalf("GetCellInfo(non-existing cell) failed: %v", err)
}
}

func TestExpandCells(t *testing.T) {
ctx := context.Background()
var cells []string
var err error
var allCells = "cell1,cell2,cell3"
type testCase struct {
name string
cellsIn string
cellsOut []string
errString string
}

testCases := []testCase{
{"single", "cell1", []string{"cell1"}, ""},
{"multiple", "cell1,cell2,cell3", []string{"cell1", "cell2", "cell3"}, ""},
{"empty", "", []string{"cell1", "cell2", "cell3"}, ""},
{"bad", "unknown", nil, "node doesn't exist"},
}

for _, tCase := range testCases {
t.Run(tCase.name, func(t *testing.T) {
cellsIn := tCase.cellsIn
if cellsIn == "" {
cellsIn = allCells
}
topoCells := strings.Split(cellsIn, ",")
var ts *topo.Server
if tCase.name == "bad" {
ts = memorytopo.NewServer()
} else {
ts = memorytopo.NewServer(topoCells...)
}
cells, err = ts.ExpandCells(ctx, cellsIn)
if tCase.errString != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tCase.errString)
} else {
require.NoError(t, err)
}
require.EqualValues(t, tCase.cellsOut, cells)
})
}

}
70 changes: 70 additions & 0 deletions go/vt/topo/topotests/srv_keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"context"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -1170,3 +1172,71 @@ func TestMasterMigrateServedType(t *testing.T) {
t.Errorf("MigrateServedType() failure. Got %v, want: %v", string(got), string(want))
}
}

func TestValidateSrvKeyspace(t *testing.T) {
cell := "cell1"
cell2 := "cell2"
keyspace := "ks1"
ctx := context.Background()
ts := memorytopo.NewServer(cell, cell2)

leftKeyRange, err := key.ParseShardingSpec("-80")
if err != nil || len(leftKeyRange) != 1 {
t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(leftKeyRange))
}

rightKeyRange, err := key.ParseShardingSpec("80-")
if err != nil || len(rightKeyRange) != 1 {
t.Fatalf("ParseShardingSpec failed. Expected non error and only one element. Got err: %v, len(%v)", err, len(rightKeyRange))
}

correct := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: "-80",
KeyRange: leftKeyRange[0],
},
{
Name: "80-",
KeyRange: rightKeyRange[0],
},
},
},
},
}

incorrect := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: "80-",
KeyRange: rightKeyRange[0],
},
},
},
},
}

if err := ts.UpdateSrvKeyspace(ctx, cell, keyspace, correct); err != nil {
t.Fatalf("UpdateSrvKeyspace() failed: %v", err)
}

if err := ts.UpdateSrvKeyspace(ctx, cell2, keyspace, incorrect); err != nil {
t.Fatalf("UpdateSrvKeyspace() failed: %v", err)
}
errMsg := "keyspace partition for MASTER in cell cell2 does not start with min key"
err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1,cell2")
require.EqualError(t, err, errMsg)

err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell1")
require.NoError(t, err)
err = ts.ValidateSrvKeyspace(ctx, keyspace, "cell2")
require.EqualError(t, err, errMsg)
err = ts.ValidateSrvKeyspace(ctx, keyspace, "")
require.EqualError(t, err, errMsg)
}
9 changes: 8 additions & 1 deletion go/vt/wrangler/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ limitations under the License.
package wrangler

import (
"context"
"fmt"
"sync"
"time"

"context"
"vitess.io/vitess/go/vt/log"

"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
Expand Down Expand Up @@ -67,6 +68,12 @@ func (wr *Wrangler) Reshard(ctx context.Context, keyspace, workflow string, sour
if err := wr.validateNewWorkflow(ctx, keyspace, workflow); err != nil {
return err
}
if err := wr.ts.ValidateSrvKeyspace(ctx, keyspace, cell); err != nil {
err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt in cell %s", keyspace, cell)
log.Errorf("%w", err2)
return err2
}

rs, err := wr.buildResharder(ctx, keyspace, workflow, sources, targets, cell, tabletTypes)
if err != nil {
return vterrors.Wrap(err, "buildResharder")
Expand Down
37 changes: 35 additions & 2 deletions go/vt/wrangler/resharder_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/key"

"context"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -55,7 +59,36 @@ var (
//----------------------------------------------
// testResharderEnv

func newTestResharderEnv(sources, targets []string) *testResharderEnv {
func getPartition(t *testing.T, shards []string) *topodatapb.SrvKeyspace_KeyspacePartition {
partition := &topodatapb.SrvKeyspace_KeyspacePartition{
ServedType: topodatapb.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{},
}
for _, shard := range shards {
keyRange, err := key.ParseShardingSpec(shard)
require.NoError(t, err)
require.Equal(t, 1, len(keyRange))
partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{
Name: shard,
KeyRange: keyRange[0],
})
}
return partition
}
func initTopo(t *testing.T, topo *topo.Server, keyspace string, sources, targets, cells []string) {
ctx := context.Background()
srvKeyspace := &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{},
}
srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, sources))
srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, targets))
for _, cell := range cells {
topo.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace)
}
topo.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ","))
}

func newTestResharderEnv(t *testing.T, sources, targets []string) *testResharderEnv {
env := &testResharderEnv{
keyspace: "ks",
workflow: "resharderTest",
Expand All @@ -67,7 +100,7 @@ func newTestResharderEnv(sources, targets []string) *testResharderEnv {
tmc: newTestResharderTMClient(),
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)

initTopo(t, env.topoServ, "ks", sources, targets, []string{"cell"})
tabletID := 100
for _, shard := range sources {
_ = env.addTablet(tabletID, env.keyspace, shard, topodatapb.TabletType_MASTER)
Expand Down
Loading

0 comments on commit 234a412

Please sign in to comment.