Skip to content

Commit

Permalink
sql/catalog: add optional validation that primary indexes store all c…
Browse files Browse the repository at this point in the history
…olumns

Previously, we had a bug were it was possible that the primary indexes
was not storing some non-virtual columns. Unfortunately, adding
validation in all cases would break existing customers, so as
a safety we have added validation to crdb_internal.invalid_objects
and the debug doctor command. To address this, this patch adds a
new validation level only for invalid_objects / doctor.

Fixes: #106794
Informs: #106739

Release note: None
  • Loading branch information
fqazi authored and rafiss committed Jul 18, 2023
1 parent 05d06e5 commit 0de61dd
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 3 deletions.
9 changes: 9 additions & 0 deletions pkg/sql/catalog/internal/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (

// Write is the default validation level when writing a descriptor.
Write = catalog.ValidationLevelAllPreTxnCommit

// ForUpgrade is the validation used by doctor which includes
// extra things for upgrades
ForUpgrade = catalog.ValidationLevelExtraForUpgrade
)

// Self is a convenience function for Validate called at the
Expand Down Expand Up @@ -216,6 +220,11 @@ func (vea *validationErrorAccumulator) IsActive(version clusterversion.Key) bool
return vea.activeVersion.IsActive(version)
}

// HasExtraChecksForUpgrade implements catalog.ValidationErrorAccumulator.
func (vea *validationErrorAccumulator) HasExtraChecksForUpgrade() bool {
return vea.targetLevel == catalog.ValidationLevelExtraForUpgrade
}

func (vea *validationErrorAccumulator) reportDescGetterError(
state validationErrorAccumulatorState, err error,
) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/nstree/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (c Catalog) ValidateWithRecover(
ve = append(ve, err)
}
}()
return c.Validate(ctx, version, catalog.NoValidationTelemetry, validate.Write, desc)
return c.Validate(ctx, version, catalog.NoValidationTelemetry, validate.ForUpgrade, desc)
}

// ByteSize returns memory usage of the underlying map in bytes.
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/catalog/randgen/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ func (g *testSchemaGenerator) genOneTable(
colName := ng.GenerateOne(0)
tmpl.desc.Columns[i+1].Name = colName
tmpl.desc.Families[0].ColumnNames[i+1] = colName
for j := range tmpl.desc.PrimaryIndex.KeyColumnNames {
if tmpl.desc.PrimaryIndex.KeyColumnIDs[j] == tmpl.desc.Columns[i+1].ID {
tmpl.desc.PrimaryIndex.KeyColumnNames[j] = colName
}
}
for j := range tmpl.desc.PrimaryIndex.StoreColumnNames {
if tmpl.desc.PrimaryIndex.StoreColumnIDs[j] == tmpl.desc.Columns[i+1].ID {
tmpl.desc.PrimaryIndex.StoreColumnNames[j] = colName
}
}
}
ng := randident.NewNameGenerator(&nameGenCfg, g.rand, "primary")
idxName := ng.GenerateOne(0)
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/catalog/randgen/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ outer:
t.desc.Families[0].ColumnIDs, colID)
t.desc.Families[0].ColumnNames = append(
t.desc.Families[0].ColumnNames, newColDef.Name)
// Add to the primary index as either a key or store column.
for i, name := range origDesc.PrimaryIndex.KeyColumnNames {
if name == origColDef.Name {
t.desc.PrimaryIndex.KeyColumnIDs = append(t.desc.PrimaryIndex.KeyColumnIDs, colID)
t.desc.PrimaryIndex.KeyColumnNames = append(t.desc.PrimaryIndex.KeyColumnNames, name)
t.desc.PrimaryIndex.KeyColumnDirections = append(t.desc.PrimaryIndex.KeyColumnDirections, origDesc.PrimaryIndex.KeyColumnDirections[i])
break
}
}
for _, name := range origDesc.PrimaryIndex.StoreColumnNames {
if name == origColDef.Name {
t.desc.PrimaryIndex.StoreColumnIDs = append(t.desc.PrimaryIndex.StoreColumnIDs, colID)
t.desc.PrimaryIndex.StoreColumnNames = append(t.desc.PrimaryIndex.StoreColumnNames, name)
break
}
}
}
g.models.tb = append(g.models.tb, t)
}
Expand Down Expand Up @@ -192,6 +208,14 @@ func defaultTemplate() tbTemplate {
t.desc.Families[0].ColumnIDs, colID)
t.desc.Families[0].ColumnNames = append(
t.desc.Families[0].ColumnNames, colName)
if colID == 0 {
t.desc.PrimaryIndex.KeyColumnIDs = []descpb.ColumnID{colID}
t.desc.PrimaryIndex.KeyColumnNames = []string{colName}
t.desc.PrimaryIndex.KeyColumnDirections = []catenumpb.IndexColumn_Direction{catenumpb.IndexColumn_ASC}
} else {
t.desc.PrimaryIndex.StoreColumnIDs = append(t.desc.PrimaryIndex.StoreColumnIDs, colID)
t.desc.PrimaryIndex.StoreColumnNames = append(t.desc.PrimaryIndex.StoreColumnNames, colName)
}
}
return t
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/tabledesc/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (cea *constraintValidationErrorAccumulator) IsActive(version clusterversion
return true
}

// HasExtraChecksForUpgrade implements catalog.ValidationErrorAccumulator.
func (cea *constraintValidationErrorAccumulator) HasExtraChecksForUpgrade() bool {
return true
}

func ValidateConstraints(immI catalog.TableDescriptor) error {
imm, ok := immI.(*immutable)
if !ok {
Expand Down
40 changes: 38 additions & 2 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
desc.validateColumnFamilies(columnsByID),
desc.validateCheckConstraints(columnsByID),
desc.validateUniqueWithoutIndexConstraints(columnsByID),
desc.validateTableIndexes(columnsByID),
desc.validateTableIndexes(columnsByID, vea.HasExtraChecksForUpgrade()),
desc.validatePartitioning(),
}
hasErrs := false
Expand Down Expand Up @@ -1274,7 +1274,9 @@ func (desc *wrapper) validateUniqueWithoutIndexConstraints(
// IDs are unique, and the family of the primary key is 0. This does not check
// if indexes are unique (i.e. same set of columns, direction, and uniqueness)
// as there are practical uses for them.
func (desc *wrapper) validateTableIndexes(columnsByID map[descpb.ColumnID]catalog.Column) error {
func (desc *wrapper) validateTableIndexes(
columnsByID map[descpb.ColumnID]catalog.Column, upgradeChecks bool,
) error {
if len(desc.PrimaryIndex.KeyColumnIDs) == 0 {
return ErrMissingPrimaryKey
}
Expand Down Expand Up @@ -1547,6 +1549,40 @@ func (desc *wrapper) validateTableIndexes(columnsByID map[descpb.ColumnID]catalo
idx.GetName(), colID, foundIn)
}
}
// Checks after this point are for future release. Also exclude system
// tables from these checks.
if !upgradeChecks {
continue
}
// Check that each non-virtual column is in the key or store columns of
// the primary index.
if idx.Primary() {
keyCols := idx.CollectKeyColumnIDs()
storeCols := idx.CollectPrimaryStoredColumnIDs()
for _, col := range desc.PublicColumns() {
if col.IsVirtual() {
if storeCols.Contains(col.GetID()) {
return errors.Newf(
"primary index %q store columns cannot contain virtual column ID %d",
idx.GetName(), col.GetID(),
)
}
// No need to check anything else for virtual columns.
continue
}
if !keyCols.Contains(col.GetID()) && !storeCols.Contains(col.GetID()) {
return errors.Newf(
"primary index %q must contain column ID %d in either key or store columns",
idx.GetName(), col.GetID(),
)
}
}
} else if len(desc.Mutations) == 0 && desc.DeclarativeSchemaChangerState == nil {
if idx.IndexDesc().EncodingType != catenumpb.SecondaryIndexEncoding {
return errors.AssertionFailedf("secondary index %q has invalid encoding type %d in proto, expected %d",
idx.GetName(), idx.IndexDesc().EncodingType, catenumpb.SecondaryIndexEncoding)
}
}
}
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/catalog/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
// Errors accumulated when validating up to this level come with additional
// telemetry.
ValidationLevelAllPreTxnCommit
// ValidationLevelExtraForUpgrade means do all of the above, but
// apply optional checks which aim at checking upgrade compatibility.
ValidationLevelExtraForUpgrade
)

// ValidationTelemetry defines the kind of telemetry keys to add to the errors.
Expand Down Expand Up @@ -71,6 +74,10 @@ type ValidationErrorAccumulator interface {
// be enabled, by comparing against the active version. If the active version
// is unknown the validation in question will always be enabled.
IsActive(version clusterversion.Key) bool

// HasExtraChecksForUpgrade returns if extra checks required for upgrades
// should be performed.
HasExtraChecksForUpgrade() bool
}

// ValidationErrors is the error container returned by Validate which contains
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/drop_index
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,12 @@ FROM
# Drop INDEX will be blocked
statement error pq: index t_with_idx_data_loss_n_idx cannot be safely dropped, since doing so will lose data in certain columns
DROP INDEX t_with_idx_data_loss@t_with_idx_data_loss_n_idx;

# Validate this object is seen as invalid.
query TT
SELECT obj_name, error FROM crdb_internal.invalid_objects
----
t_with_idx_data_loss relation "t_with_idx_data_loss" (125): primary index "t_with_idx_data_loss_pkey" must contain column ID 2 in either key or store columns

statement ok
DROP TABLE t_with_idx_data_loss;

0 comments on commit 0de61dd

Please sign in to comment.