Skip to content

Commit

Permalink
schemadiff: validate and apply foreign key indexes (#12026)
Browse files Browse the repository at this point in the history
* more testing for getViewDependentTableNames()

Signed-off-by: Shlomi Noach <[email protected]>

* getForeignKeyParentTableNames

Signed-off-by: Shlomi Noach <[email protected]>

* schema normalization: sort tables by fk reference order

Signed-off-by: Shlomi Noach <[email protected]>

* minor test addition

Signed-off-by: Shlomi Noach <[email protected]>

* ForeignKeyDependencyUnresolvedError

Signed-off-by: Shlomi Noach <[email protected]>

* InvalidReferencedColumnInForeignKeyConstraintError, MismatchingForeignKeyColumnCountError, MismatchingForeignKeyColumnTypeError

Signed-off-by: Shlomi Noach <[email protected]>

* validate or autogenerate index on local table based on foreign key constraint columns

Signed-off-by: Shlomi Noach <[email protected]>

* validate referenced table key

Signed-off-by: Shlomi Noach <[email protected]>

* ColumnKeyOption consts become public

Signed-off-by: Shlomi Noach <[email protected]>

* adding DuplicateKeyNameError

Signed-off-by: Shlomi Noach <[email protected]>

* normalize PRIMARY KEY definition: a 'id int primary key' column definition splits into 'id int' and 'primary key (id)' parts. Primary key is always the first key in the list of keys. Handle queries such as 'modify id int primary key' (validate there isn't already a primary key, or that the existing primary key is over 'id'. add column with primary key definition...

Signed-off-by: Shlomi Noach <[email protected]>

* further unit test adaptations

Signed-off-by: Shlomi Noach <[email protected]>

* adapting tests: adding required local index over foreign key columns

Signed-off-by: Shlomi Noach <[email protected]>

* implicitly add key over foreign key columns if no appropriate key exists

Signed-off-by: Shlomi Noach <[email protected]>

* normalize CREATE TABLE statement to include missing foreign key indexes

Signed-off-by: Shlomi Noach <[email protected]>

* rename error: ForeignKeyColumnCountMismatchError

Signed-off-by: Shlomi Noach <[email protected]>

* rename MismatchingForeignKeyColumnTypeError to ForeignKeyColumnTypeMismatchError

Signed-off-by: Shlomi Noach <[email protected]>

* changes per review

Signed-off-by: Shlomi Noach <[email protected]>

* add a few tests that validate an implicit index isn't added if not needed

Signed-off-by: Shlomi Noach <[email protected]>

* case insensitive column comparison in indexCoversColumnsInOrder and in tests

Signed-off-by: Shlomi Noach <[email protected]>

* MissingForeignKeyIndexError is not needed bcause, compatible with MySQL behavior, we implicitly add the index if missing

Signed-off-by: Shlomi Noach <[email protected]>

---------

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jan 29, 2023
1 parent f33b862 commit ae8406d
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 46 deletions.
26 changes: 26 additions & 0 deletions go/vt/schemadiff/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ func (e *ForeignKeyColumnTypeMismatchError) Error() string {
)
}

type MissingForeignKeyReferencedIndexError struct {
Table string
Constraint string
ReferencedTable string
}

func (e *MissingForeignKeyReferencedIndexError) Error() string {
return fmt.Sprintf("missing index in referenced table %s for foreign key constraint %s in table %s",
sqlescape.EscapeID(e.ReferencedTable),
sqlescape.EscapeID(e.Constraint),
sqlescape.EscapeID(e.Table),
)
}

type IndexNeededByForeignKeyError struct {
Table string
Key string
}

func (e *IndexNeededByForeignKeyError) Error() string {
return fmt.Sprintf("key %s needed by a foreign key constraint in table %s",
sqlescape.EscapeID(e.Key),
sqlescape.EscapeID(e.Table),
)
}

type ViewDependencyUnresolvedError struct {
View string
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/schemadiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,9 @@ func (s *Schema) normalize() error {
}
}

// TODO(shlomi): find a valid index
if !referencedTable.columnsCoveredByInOrderIndex(check.ReferenceDefinition.ReferencedColumns) {
return &MissingForeignKeyReferencedIndexError{Table: t.Name(), Constraint: cs.Name.String(), ReferencedTable: referencedTableName}
}
}
}
return nil
Expand Down
36 changes: 20 additions & 16 deletions go/vt/schemadiff/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,15 @@ func TestGetForeignKeyParentTableNames(t *testing.T) {

func TestTableForeignKeyOrdering(t *testing.T) {
fkQueries := []string{
"create table t11 (id int primary key, i int, constraint f12 foreign key (i) references t12(id) on delete restrict, constraint f20 foreign key (i) references t20(id) on delete restrict)",
"create table t11 (id int primary key, i int, key ix (i), constraint f12 foreign key (i) references t12(id) on delete restrict, constraint f20 foreign key (i) references t20(id) on delete restrict)",
"create table t15(id int, primary key(id))",
"create view v09 as select * from v13, t17",
"create table t20 (id int primary key, i int, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t20 (id int primary key, i int, key ix (i), constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create view v13 as select * from t20",
"create table t12 (id int primary key, i int, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t17 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t16 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t14 (id int primary key, i int, constraint f14 foreign key (i) references t14(id) on delete restrict)",
"create table t12 (id int primary key, i int, key ix (i), constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t17 (id int primary key, i int, key ix (i), constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t16 (id int primary key, i int, key ix (i), constraint f11 foreign key (i) references t11(id) on delete restrict, constraint f15 foreign key (i) references t15(id) on delete restrict)",
"create table t14 (id int primary key, i int, key ix (i), constraint f14 foreign key (i) references t14(id) on delete restrict)",
}
expectSortedTableNames := []string{
"t14",
Expand Down Expand Up @@ -287,10 +287,10 @@ func TestInvalidSchema(t *testing.T) {
expectErr error
}{
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i) references t11(id) on delete restrict)",
schema: "create table t11 (id int primary key, i int, key ix(i), constraint f11 foreign key (i) references t11(id) on delete restrict)",
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
},
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i7) references t11(id) on delete restrict)",
Expand All @@ -309,35 +309,39 @@ func TestInvalidSchema(t *testing.T) {
expectErr: &ForeignKeyDependencyUnresolvedError{Table: "t11"},
},
{
schema: "create table t11 (id int primary key, i int, constraint f11 foreign key (i) references t11(id2) on delete restrict)",
schema: "create table t11 (id int primary key, i int, key ix(i), constraint f11 foreign key (i) references t11(id2) on delete restrict)",
expectErr: &InvalidReferencedColumnInForeignKeyConstraintError{Table: "t11", Constraint: "f11", ReferencedTable: "t11", ReferencedColumn: "id2"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(x) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(x) on delete restrict)",
expectErr: &InvalidReferencedColumnInForeignKeyConstraintError{Table: "t11", Constraint: "f10", ReferencedTable: "t10", ReferencedColumn: "x"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int unsigned, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key, i int); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(i) on delete restrict)",
expectErr: &MissingForeignKeyReferencedIndexError{Table: "t11", Constraint: "f10", ReferencedTable: "t10"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i int unsigned, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id int primary key); create table t11 (id int primary key, i bigint, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id int primary key); create table t11 (id int primary key, i bigint, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i int, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i int, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i varchar(100), constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id bigint primary key); create table t11 (id int primary key, i varchar(100), key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
{
// InnoDB allows different string length
schema: "create table t10(id varchar(50) primary key); create table t11 (id int primary key, i varchar(100), constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id varchar(50) primary key); create table t11 (id int primary key, i varchar(100), key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
},
{
schema: "create table t10(id varchar(50) charset utf8mb3 primary key); create table t11 (id int primary key, i varchar(100) charset utf8mb4, constraint f10 foreign key (i) references t10(id) on delete restrict)",
schema: "create table t10(id varchar(50) charset utf8mb3 primary key); create table t11 (id int primary key, i varchar(100) charset utf8mb4, key ix(i), constraint f10 foreign key (i) references t10(id) on delete restrict)",
expectErr: &ForeignKeyColumnTypeMismatchError{Table: "t11", Constraint: "f10", Column: "i", ReferencedTable: "t10", ReferencedColumn: "id"},
},
}
Expand Down
125 changes: 106 additions & 19 deletions go/vt/schemadiff/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ func NewCreateTableEntity(c *sqlparser.CreateTable) (*CreateTableEntity, error)
// - table option case (upper/lower/special)
// The function returns this receiver as courtesy
func (c *CreateTableEntity) normalize() *CreateTableEntity {
c.normalizeKeys()
c.normalizePrimaryKeyColumns()
c.normalizeForeignKeyIndexes() // implicitly add missing indexes for foreign keys
c.normalizeKeys() // assign names to keys
c.normalizeUnnamedConstraints()
c.normalizeTableOptions()
c.normalizeColumnOptions()
Expand Down Expand Up @@ -622,7 +624,7 @@ func (c *CreateTableEntity) normalizeKeys() {
}

func (c *CreateTableEntity) normalizeUnnamedConstraints() {
// let's ensure all keys have names
// let's ensure all constraints have names
constraintNameExists := map[string]bool{}
// first, we iterate and take note for all keys that do already have names
for _, constraint := range c.CreateTable.TableSpec.Constraints {
Expand Down Expand Up @@ -650,6 +652,34 @@ func (c *CreateTableEntity) normalizeUnnamedConstraints() {
}
}

func (c *CreateTableEntity) normalizeForeignKeyIndexes() {
for _, constraint := range c.CreateTable.TableSpec.Constraints {
fk, ok := constraint.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if !c.columnsCoveredByInOrderIndex(fk.Source) {
// We add a foreign key, but the local FK columns are not indexed.
// MySQL's behavior is to implicitly add an index that covers the foreign key's local columns.
// The name of the index is either:
// - the same name of the constraint, if such name is provided
// - and error if an index by this name exists
// - or, a standard auto-generated index name, if the constraint name is not provided
indexDefinition := &sqlparser.IndexDefinition{
Info: &sqlparser.IndexInfo{
Type: "key",
Name: constraint.Name, // if name is empty, then the name is later auto populated
},
}
for _, col := range fk.Source {
indexColumn := &sqlparser.IndexColumn{Column: col}
indexDefinition.Columns = append(indexDefinition.Columns, indexColumn)
}
c.TableSpec.Indexes = append(c.TableSpec.Indexes, indexDefinition)
}
}
}

// Name implements Entity interface
func (c *CreateTableEntity) Name() string {
return c.CreateTable.GetTable().Name.String()
Expand Down Expand Up @@ -1758,6 +1788,21 @@ func (c *CreateTableEntity) apply(diff *AlterTableEntityDiff) error {
if !found {
return &ApplyKeyNotFoundError{Table: c.Name(), Key: opt.Name.String()}
}

// Now, if this is a normal key being dropped, let's validate it does not leave any foreign key constraint uncovered
switch opt.Type {
case sqlparser.PrimaryKeyType, sqlparser.NormalKeyType:
for _, cs := range c.CreateTable.TableSpec.Constraints {
fk, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if !c.columnsCoveredByInOrderIndex(fk.Source) {
return &IndexNeededByForeignKeyError{Table: c.Name(), Key: opt.Name.String()}
}
}
}

case *sqlparser.AddIndexDefinition:
// validate no existing key by same name
keyName := opt.IndexDefinition.Info.Name.String()
Expand Down Expand Up @@ -1990,6 +2035,7 @@ func (c *CreateTableEntity) Apply(diff EntityDiff) (Entity, error) {
// - edit or remove keys if referenced columns are dropped
// - drop check constraints for a single specific column if that column
// is the only referenced column in that check constraint.
// - add implicit keys for foreign key constraint, if needed
func (c *CreateTableEntity) postApplyNormalize() error {
// reduce or remove keys based on existing column list
// (a column may have been removed)postApplyNormalize
Expand Down Expand Up @@ -2051,6 +2097,10 @@ func (c *CreateTableEntity) postApplyNormalize() error {
}
c.CreateTable.TableSpec.Constraints = keptConstraints

c.normalizePrimaryKeyColumns()
c.normalizeForeignKeyIndexes()
c.normalizeKeys()

return nil
}

Expand Down Expand Up @@ -2081,6 +2131,44 @@ func getKeyColumnNames(key *sqlparser.IndexDefinition) (colNames map[string]bool
return colNames
}

// indexCoversColumnsInOrder checks if the given index covers the given columns in order and in prefix.
// the index must either covers the exact list of columns or continue to cover additional columns beyond.
// Used for validating indexes covering foreign keys.
func indexCoversColumnsInOrder(index *sqlparser.IndexDefinition, columns sqlparser.Columns) bool {
if len(columns) == 0 {
return false
}
if len(index.Columns) < len(columns) {
// obviously the index doesn't cover the required columns
return false
}
for i, col := range columns {
// the index must cover same columns, in order, wih possibly more columns covered than requested.
indexCol := index.Columns[i]
if !strings.EqualFold(col.String(), indexCol.Column.String()) {
return false
}
}
return true
}

// indexesCoveringForeignKeyColumns returns a list of indexes that cover a given list of coumns, in-oder and in prefix.
// Used for validating indexes covering foreign keys.
func (c *CreateTableEntity) indexesCoveringForeignKeyColumns(columns sqlparser.Columns) (indexes []*sqlparser.IndexDefinition) {
for _, index := range c.CreateTable.TableSpec.Indexes {
if indexCoversColumnsInOrder(index, columns) {
indexes = append(indexes, index)
}
}
return indexes
}

// columnsCoveredByInOrderIndex returns 'true' when there is at least one index that covers the given
// list of columns in-order and in-prefix.
func (c *CreateTableEntity) columnsCoveredByInOrderIndex(columns sqlparser.Columns) bool {
return len(c.indexesCoveringForeignKeyColumns(columns)) > 0
}

func (c *CreateTableEntity) validateDuplicateKeyNameError() error {
keyNames := map[string]bool{}
for _, key := range c.CreateTable.TableSpec.Indexes {
Expand All @@ -2104,6 +2192,22 @@ func (c *CreateTableEntity) validate() error {
}
columnExists[colName] = true
}
// validate all columns used by foreign key constraints do in fact exist,
// and that there exists an index over those columns
for _, cs := range c.CreateTable.TableSpec.Constraints {
fk, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if len(fk.Source) != len(fk.ReferenceDefinition.ReferencedColumns) {
return &ForeignKeyColumnCountMismatchError{Table: c.Name(), Constraint: cs.Name.String(), ColumnCount: len(fk.Source), ReferencedTable: fk.ReferenceDefinition.ReferencedTable.Name.String(), ReferencedColumnCount: len(fk.ReferenceDefinition.ReferencedColumns)}
}
for _, col := range fk.Source {
if !columnExists[col.Lowered()] {
return &InvalidColumnInForeignKeyConstraintError{Table: c.Name(), Constraint: cs.Name.String(), Column: col.String()}
}
}
}
// validate all columns referenced by indexes do in fact exist
for _, key := range c.CreateTable.TableSpec.Indexes {
for colName := range getKeyColumnNames(key) {
Expand Down Expand Up @@ -2154,23 +2258,6 @@ func (c *CreateTableEntity) validate() error {
}
}
}
// validate all columns used by foreign key constraints do in fact exist,
// and that there exists an index over those columns
for _, cs := range c.CreateTable.TableSpec.Constraints {
check, ok := cs.Details.(*sqlparser.ForeignKeyDefinition)
if !ok {
continue
}
if len(check.Source) != len(check.ReferenceDefinition.ReferencedColumns) {
return &ForeignKeyColumnCountMismatchError{Table: c.Name(), Constraint: cs.Name.String(), ColumnCount: len(check.Source), ReferencedTable: check.ReferenceDefinition.ReferencedTable.Name.String(), ReferencedColumnCount: len(check.ReferenceDefinition.ReferencedColumns)}
}
for _, col := range check.Source {
if !columnExists[col.Lowered()] {
return &InvalidColumnInForeignKeyConstraintError{Table: c.Name(), Constraint: cs.Name.String(), Column: col.String()}
}
}
// TODO(shlomi): find a valid index
}
// validate all columns referenced by constraint checks do in fact exist
for _, cs := range c.CreateTable.TableSpec.Constraints {
check, ok := cs.Details.(*sqlparser.CheckConstraintDefinition)
Expand Down
Loading

0 comments on commit ae8406d

Please sign in to comment.