Skip to content

Commit

Permalink
added ValidateVSchema and ValidateVSchemaKeyspace
Browse files Browse the repository at this point in the history
Signed-off-by: Malcolm Akinje <[email protected]>
  • Loading branch information
makinje16 committed May 10, 2021
1 parent 92da561 commit 3479947
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 33 deletions.
10 changes: 6 additions & 4 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,10 @@ var commands = []commandGroup{
"[-concurrency=10] [-include_master=false] <keyspace>",
"Reloads the schema on all the tablets in a keyspace."},
{"ValidateSchemaShard", commandValidateSchemaShard,
"[-exclude_tables=''] [-include-views] <keyspace/shard>",
"[-exclude_tables=''] [-include-views] [-include-vschema] <keyspace/shard>",
"Validates that the master schema matches all of the replicas."},
{"ValidateSchemaKeyspace", commandValidateSchemaKeyspace,
"[-exclude_tables=''] [-include-views] [-skip-no-master] <keyspace name>",
"[-exclude_tables=''] [-include-views] [-skip-no-master] [-include-vschema] <keyspace name>",
"Validates that the master schema from shard 0 matches the schema on all of the other tablets in the keyspace."},
{"ApplySchema", commandApplySchema,
"[-allow_long_unavailability] [-wait_replicas_timeout=10s] [-ddl_strategy=<ddl_strategy>] [-request_context=<unique-request-context>] [-skip_preflight] {-sql=<sql> || -sql-file=<filename>} <keyspace>",
Expand Down Expand Up @@ -2768,6 +2768,7 @@ func commandReloadSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, sub
func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", false, "Includes views in the validation")
includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2783,13 +2784,14 @@ func commandValidateSchemaShard(ctx context.Context, wr *wrangler.Wrangler, subF
if *excludeTables != "" {
excludeTableArray = strings.Split(*excludeTables, ",")
}
return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, false /*includeVSchema*/)
return wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTableArray, *includeViews, *includeVSchema)
}

func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
excludeTables := subFlags.String("exclude_tables", "", "Specifies a comma-separated list of tables to exclude. Each is either an exact match, or a regular expression of the form /regexp/")
includeViews := subFlags.Bool("include-views", false, "Includes views in the validation")
skipNoMaster := subFlags.Bool("skip-no-master", false, "Skip shards that don't have master when performing validation")
includeVSchema := subFlags.Bool("include-vschema", false, "Validate schemas against the vschema")
if err := subFlags.Parse(args); err != nil {
return err
}
Expand All @@ -2802,7 +2804,7 @@ func commandValidateSchemaKeyspace(ctx context.Context, wr *wrangler.Wrangler, s
if *excludeTables != "" {
excludeTableArray = strings.Split(*excludeTables, ",")
}
return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster)
return wr.ValidateSchemaKeyspace(ctx, keyspace, excludeTableArray, *includeViews, *skipNoMaster, *includeVSchema)
}

func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctld/vtctld.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func InitVtctld(ts *topo.Server) {

actionRepo.RegisterKeyspaceAction("ValidateSchemaKeyspace",
func(ctx context.Context, wr *wrangler.Wrangler, keyspace string) (string, error) {
return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil, false, false)
return "", wr.ValidateSchemaKeyspace(ctx, keyspace, nil /*excludeTables*/, false /*includeViews*/, false /*skipNoMaster*/, false /*includeVSchema*/)
})

actionRepo.RegisterKeyspaceAction("ValidateVersionKeyspace",
Expand Down
73 changes: 58 additions & 15 deletions go/vt/wrangler/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,9 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str
}

if includeVSchema {
vschm, err := wr.ts.GetVSchema(ctx, keyspace)
err := wr.ValidateVSchema(ctx, keyspace, []string{shard}, excludeTables, includeViews)
if err != nil {
return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err)
}
notFoundTables := []string{}

for _, tableDef := range masterSchema.TableDefinitions {
if _, ok := vschm.Tables[tableDef.Name]; !ok {
notFoundTables = append(notFoundTables, tableDef.Name)
}
}

if len(notFoundTables) > 0 {
return fmt.Errorf("Vschema Validation Failed: the following tables were not found in the vschema %v", notFoundTables)
return err
}
}

Expand Down Expand Up @@ -204,7 +193,7 @@ func (wr *Wrangler) ValidateSchemaShard(ctx context.Context, keyspace, shard str

// ValidateSchemaKeyspace will diff the schema from all the tablets in
// the keyspace.
func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool) error {
func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string, excludeTables []string, includeViews, skipNoMaster bool, includeVSchema bool) error {
// find all the shards
shards, err := wr.ts.GetShardNames(ctx, keyspace)
if err != nil {
Expand All @@ -217,7 +206,7 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
}
sort.Strings(shards)
if len(shards) == 1 {
return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, false /*includeVSchema*/)
return wr.ValidateSchemaShard(ctx, keyspace, shards[0], excludeTables, includeViews, includeVSchema)
}

var referenceSchema *tabletmanagerdatapb.SchemaDefinition
Expand All @@ -227,6 +216,15 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
er := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

// If we are checking against the vschema then all shards
// should just be validated individually against it
if includeVSchema {
err := wr.ValidateVSchema(ctx, keyspace, shards, excludeTables, includeViews)
if err != nil {
return err
}
}

// then diffs all tablets in the other shards
for _, shard := range shards[0:] {
si, err := wr.ts.GetShard(ctx, keyspace, shard)
Expand Down Expand Up @@ -273,6 +271,51 @@ func (wr *Wrangler) ValidateSchemaKeyspace(ctx context.Context, keyspace string,
return nil
}

// ValidateVSchema compares the schema of each primary tablet in "keyspace/shards..." to the vschema and errs if there are differences
func (wr *Wrangler) ValidateVSchema(ctx context.Context, keyspace string, shards []string, excludeTables []string, includeViews bool) error {
vschm, err := wr.ts.GetVSchema(ctx, keyspace)
if err != nil {
return fmt.Errorf("GetVSchema(%s) failed: %v", keyspace, err)
}

shardFailures := concurrency.AllErrorRecorder{}
var wg sync.WaitGroup
wg.Add(len(shards))

for _, shard := range shards {
go func(shard string) {
defer wg.Done()
notFoundTables := []string{}
si, err := wr.ts.GetShard(ctx, keyspace, shard)
if err != nil {
shardFailures.RecordError(fmt.Errorf("GetShard(%v, %v) failed: %v", keyspace, shard, err))
return
}
masterSchema, err := wr.GetSchema(ctx, si.MasterAlias, nil, excludeTables, includeViews)
if err != nil {
shardFailures.RecordError(fmt.Errorf("GetSchema(%s, nil, %v, %v) (%v/%v) failed: %v", si.MasterAlias.String(),
excludeTables, includeViews, keyspace, shard, err,
))
return
}
for _, tableDef := range masterSchema.TableDefinitions {
if _, ok := vschm.Tables[tableDef.Name]; !ok {
notFoundTables = append(notFoundTables, tableDef.Name)
}
}
if len(notFoundTables) > 0 {
shardFailure := fmt.Errorf("%v/%v has tables that are not in the vschema: %v", keyspace, shard, notFoundTables)
shardFailures.RecordError(shardFailure)
}
}(shard)
}
wg.Wait()
if shardFailures.HasErrors() {
return fmt.Errorf("ValidateVSchema(%v, %v, %v, %v) failed: %v", keyspace, shards, excludeTables, includeViews, shardFailures.Error().Error())
}
return nil
}

// PreflightSchema will try a schema change on the remote tablet.
func (wr *Wrangler) PreflightSchema(ctx context.Context, tabletAlias *topodatapb.TabletAlias, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
ti, err := wr.ts.GetTablet(ctx, tabletAlias)
Expand Down
61 changes: 60 additions & 1 deletion go/vt/wrangler/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,67 @@ func TestValidateSchemaShard(t *testing.T) {
}
}

// Schema Checks
err := tme.wr.ValidateSchemaShard(ctx, "ks", "-80", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/)
require.NoError(t, err)
shouldErr := tme.wr.ValidateSchemaShard(ctx, "ks", "80-", nil /*excludeTables*/, true /*includeViews*/, true /*includeVSchema*/)
require.Contains(t, shouldErr.Error(), "Vschema Validation Failed:")
require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:")

// VSchema Specific Checks
err = tme.wr.ValidateVSchema(ctx, "ks", []string{"-80"}, nil /*excludeTables*/, true /*includeViews*/)
require.NoError(t, err)
shouldErr = tme.wr.ValidateVSchema(ctx, "ks", []string{"80-"}, nil /*excludeTables*/, true /*includeVoews*/)
require.Contains(t, shouldErr.Error(), "ks/80- has tables that are not in the vschema:")
}

func TestValidateSchemaKeyspace(t *testing.T) {
ctx := context.Background()
sourceShards := []string{"-80", "80-"}
targetShards := []string{"-40", "40-80", "80-c0", "c0-"}

tmePass := newTestShardMigrater(ctx, t, sourceShards, targetShards)
tmeDiffs := newTestShardMigrater(ctx, t, sourceShards, targetShards)

schm := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "not_in_vschema",
Columns: []string{"c1", "c2"},
PrimaryKeyColumns: []string{"c1"},
Fields: sqltypes.MakeTestFields("c1|c2", "int64|int64"),
}},
}

// This is the vschema returned by newTestShardMigrater
sameAsVSchema := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "t1",
Columns: []string{"c1"},
},
{
Name: "t2",
Columns: []string{"c1"},
},
{
Name: "t3",
Columns: []string{"c1"},
},
},
}

for _, primary := range append(tmePass.sourceMasters, tmePass.targetMasters...) {
primary.FakeMysqlDaemon.Schema = sameAsVSchema
}

for _, primary := range append(tmeDiffs.sourceMasters, tmeDiffs.targetMasters...) {
primary.FakeMysqlDaemon.Schema = schm
}

// Schema Checks
err := tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/)
require.NoError(t, err)
err = tmePass.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, false /*includeVSchema*/)
require.NoError(t, err)
shouldErr := tmeDiffs.wr.ValidateSchemaKeyspace(ctx, "ks", nil /*excludeTables*/, true /*includeViews*/, true /*skipNoMaster*/, true /*includeVSchema*/)
require.Error(t, shouldErr)
}
14 changes: 3 additions & 11 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,9 @@ func (vrw *VReplicationWorkflow) Create(ctx context.Context) error {
excludeTables := strings.Split(vrw.params.ExcludeTables, ",")
keyspace := vrw.params.SourceKeyspace

errs := []string{}
for _, shard := range vrw.params.SourceShards {
if err := vrw.wr.ValidateSchemaShard(ctx, keyspace, shard, excludeTables, true /*includeViews*/, true /*includeVschema*/); err != nil {
errMsg := fmt.Sprintf("%s/%s: %s", keyspace, shard, err.Error())
errs = append(errs, errMsg)
}
}

// There were some schema drifts
if len(errs) > 0 {
return fmt.Errorf("Create ReshardWorkflow failed Schema Validation:\n" + strings.Join(errs, "\n"))
vschmErr := vrw.wr.ValidateVSchema(ctx, keyspace, vrw.params.SourceShards, excludeTables, true /*includeViews*/)
if vschmErr != nil {
return fmt.Errorf("Create ReshardWorkflow failed: %v", vschmErr)
}

err = vrw.initReshard()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestVRWSchemaValidation(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, vrwf)
shouldErr := vrwf.Create(ctx)
require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed Schema Validation")
require.Contains(t, shouldErr.Error(), "Create ReshardWorkflow failed: ValidateVSchema")
}

func TestReshardV2Cancel(t *testing.T) {
Expand Down

0 comments on commit 3479947

Please sign in to comment.