Skip to content

Commit

Permalink
ddl: check index is needed in foreign key when drop index (#37813)
Browse files Browse the repository at this point in the history
close #37812
  • Loading branch information
crazycs520 authored Sep 15, 2022
1 parent fae88ae commit f80a42d
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 9 deletions.
4 changes: 4 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6303,6 +6303,10 @@ func (d *ddl) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
if err != nil {
return errors.Trace(err)
}
err = checkIndexNeededInForeignKey(is, schema.Name.L, t.Meta(), indexInfo)
if err != nil {
return err
}

jobTp := model.ActionDropIndex
if isPK {
Expand Down
63 changes: 63 additions & 0 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,66 @@ func checkTableHasForeignKeyReferredInOwner(d *ddlCtx, t *meta.Meta, schema, tbl
referredFK := checkTableHasForeignKeyReferred(is, schema, tbl, ignoreTables, fkCheck)
return referredFK, nil
}

func checkIndexNeededInForeignKey(is infoschema.InfoSchema, dbName string, tbInfo *model.TableInfo, idxInfo *model.IndexInfo) error {
referredFKs := is.GetTableReferredForeignKeys(dbName, tbInfo.Name.L)
if len(tbInfo.ForeignKeys) == 0 && len(referredFKs) == 0 {
return nil
}
remainIdxs := make([]*model.IndexInfo, 0, len(tbInfo.Indices))
for _, idx := range tbInfo.Indices {
if idx.ID == idxInfo.ID {
continue
}
remainIdxs = append(remainIdxs, idx)
}
checkFn := func(cols []model.CIStr) error {
if !model.IsIndexPrefixCovered(tbInfo, idxInfo, cols...) {
return nil
}
if tbInfo.PKIsHandle && len(cols) == 1 {
refColInfo := model.FindColumnInfo(tbInfo.Columns, cols[0].L)
if refColInfo != nil && mysql.HasPriKeyFlag(refColInfo.GetFlag()) {
return nil
}
}
for _, index := range remainIdxs {
if model.IsIndexPrefixCovered(tbInfo, index, cols...) {
return nil
}
}
return dbterror.ErrDropIndexNeededInForeignKey.GenWithStackByArgs(idxInfo.Name)
}
for _, fk := range tbInfo.ForeignKeys {
if fk.Version < model.FKVersion1 {
continue
}
err := checkFn(fk.Cols)
if err != nil {
return err
}
}
for _, referredFK := range referredFKs {
err := checkFn(referredFK.Cols)
if err != nil {
return err
}
}
return nil
}

func checkIndexNeededInForeignKeyInOwner(d *ddlCtx, t *meta.Meta, job *model.Job, dbName string, tbInfo *model.TableInfo, idxInfo *model.IndexInfo) error {
if !variable.EnableForeignKey.Load() {
return nil
}
is, err := getAndCheckLatestInfoSchema(d, t)
if err != nil {
return err
}
err = checkIndexNeededInForeignKey(is, dbName, tbInfo, idxInfo)
if err != nil {
job.State = model.JobStateCancelled
return err
}
return nil
}
127 changes: 127 additions & 0 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,133 @@ func TestDropTableWithForeignKeyReferred(t *testing.T) {
tk.MustQuery("show tables").Check(testkit.Rows("t1", "t2", "t3"))
}

func TestDropIndexNeededInForeignKey(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1")
tk.MustExec("use test")

cases := []struct {
prepares []string
drops []string
err string
}{
{
prepares: []string{
"create table t1 (id int key, b int, index idx (b))",
"create table t2 (a int, b int, index idx (b), foreign key fk_b(b) references t1(b));",
},
drops: []string{
"alter table t1 drop index idx",
"alter table t2 drop index idx",
},
err: "[ddl:1553]Cannot drop index 'idx': needed in a foreign key constraint",
},
{
prepares: []string{
"create table t1 (id int, b int, index idx (id, b))",
"create table t2 (a int, b int, index idx (b, a), foreign key fk_b(b) references t1(id));",
},
drops: []string{
"alter table t1 drop index idx",
"alter table t2 drop index idx",
},
err: "[ddl:1553]Cannot drop index 'idx': needed in a foreign key constraint",
},
}

for _, ca := range cases {
tk.MustExec("drop table if exists t2")
tk.MustExec("drop table if exists t1")
for _, sql := range ca.prepares {
tk.MustExec(sql)
}
for _, drop := range ca.drops {
// even disable foreign key check, still can't drop the index used by foreign key.
tk.MustExec("set @@foreign_key_checks=0;")
err := tk.ExecToErr(drop)
require.Error(t, err)
require.Equal(t, ca.err, err.Error())
tk.MustExec("set @@foreign_key_checks=1;")
err = tk.ExecToErr(drop)
require.Error(t, err)
require.Equal(t, ca.err, err.Error())
}
}
passCases := [][]string{
{
"create table t1 (id int key, b int, index idxb (b))",
"create table t2 (a int, b int key, index idxa (a),index idxb (b), foreign key fk_b(b) references t1(id));",
"alter table t1 drop index idxb",
"alter table t2 drop index idxa",
"alter table t2 drop index idxb",
},
{
"create table t1 (id int key, b int, index idxb (b), unique index idx(b, id))",
"create table t2 (a int, b int key, index idx (b, a),index idxb (b), index idxab(a, b), foreign key fk_b(b) references t1(b));",
"alter table t1 drop index idxb",
"alter table t1 add index idxb (b)",
"alter table t1 drop index idx",
"alter table t2 drop index idx",
"alter table t2 add index idx (b, a)",
"alter table t2 drop index idxb",
"alter table t2 drop index idxab",
},
}
tk.MustExec("set @@foreign_key_checks=1;")
for _, ca := range passCases {
tk.MustExec("drop table if exists t2")
tk.MustExec("drop table if exists t1")
for _, sql := range ca {
tk.MustExec(sql)
}
}
}

func TestDropIndexNeededInForeignKey2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := dom.DDL()
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("set @@global.tidb_enable_foreign_key=1")
tk2.MustExec("set @@foreign_key_checks=1;")
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("set @@global.tidb_enable_foreign_key=1")
tk3.MustExec("set @@foreign_key_checks=1;")
tk3.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int)")
tk.MustExec("create table t2 (a int, b int, index idx1 (b),index idx2 (b), foreign key (b) references t1(id));")

var wg sync.WaitGroup
var dropErr error
tc := &ddl.TestDDLCallback{}
tc.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StatePublic || job.Type != model.ActionDropIndex {
return
}
wg.Add(1)
go func() {
defer wg.Done()
dropErr = tk2.ExecToErr("alter table t2 drop index idx2")
}()
// make sure tk2's ddl job already put into ddl job queue.
time.Sleep(time.Millisecond * 100)
}
originalHook := d.GetHook()
defer d.SetHook(originalHook)
d.SetHook(tc)

tk.MustExec("alter table t2 drop index idx1")
wg.Wait()
require.Error(t, dropErr)
require.Equal(t, "[ddl:1553]Cannot drop index 'idx2': needed in a foreign key constraint", dropErr.Error())
}

func getTableInfo(t *testing.T, dom *domain.Domain, db, tb string) *model.TableInfo {
err := dom.Reload()
require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}

func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, indexInfo, ifExists, err := checkDropIndex(t, job)
tblInfo, indexInfo, ifExists, err := checkDropIndex(d, t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
job.Warning = toTError(err)
Expand Down Expand Up @@ -888,7 +888,7 @@ func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
tblInfo.Indices = append(tblInfo.Indices[:offset], tblInfo.Indices[offset+1:]...)
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, bool /* ifExists */, error) {
func checkDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -921,6 +921,10 @@ func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.Inde
return nil, nil, false, errors.Trace(err)
}

// Double check for drop index needed in foreign key.
if err := checkIndexNeededInForeignKeyInOwner(d, t, job, job.SchemaName, tblInfo, indexInfo); err != nil {
return nil, nil, false, errors.Trace(err)
}
return tblInfo, indexInfo, false, nil
}

Expand Down
6 changes: 3 additions & 3 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, nil
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
_, indexInfo, _, err := checkDropIndex(t, job)
func rollingbackDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
_, indexInfo, _, err := checkDropIndex(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
ver, err = rollingbackDropIndex(t, job)
ver, err = rollingbackDropIndex(d, t, job)
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
err = rollingbackDropTableOrView(t, job)
case model.ActionDropTablePartition:
Expand Down
2 changes: 1 addition & 1 deletion errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ const (
ErrEventCompile = 1550
ErrEventSameName = 1551
ErrEventDataTooLong = 1552
ErrDropIndexFk = 1553
ErrDropIndexNeededInForeignKey = 1553
ErrWarnDeprecatedSyntaxWithVer = 1554
ErrCantWriteLockLogTable = 1555
ErrCantLockLogTable = 1556
Expand Down
2 changes: 1 addition & 1 deletion errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrEventCompile: mysql.Message("Error during compilation of event's body", nil),
ErrEventSameName: mysql.Message("Same old and new event name", nil),
ErrEventDataTooLong: mysql.Message("Data for column '%s' too long", nil),
ErrDropIndexFk: mysql.Message("Cannot drop index '%-.192s': needed in a foreign key constraint", nil),
ErrDropIndexNeededInForeignKey: mysql.Message("Cannot drop index '%-.192s': needed in a foreign key constraint", nil),
ErrWarnDeprecatedSyntaxWithVer: mysql.Message("The syntax '%s' is deprecated and will be removed in MySQL %s. Please use %s instead", nil),
ErrCantWriteLockLogTable: mysql.Message("You can't write-lock a log table. Only read access is possible", nil),
ErrCantLockLogTable: mysql.Message("You can't use locks with log tables.", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,11 @@ error = '''
Duplicate partition name %-.192s
'''

["ddl:1553"]
error = '''
Cannot drop index '%-.192s': needed in a foreign key constraint
'''

["ddl:1562"]
error = '''
Cannot create temporary table with partitions
Expand Down
2 changes: 1 addition & 1 deletion parser/mysql/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ const (
ErrEventCompile = 1550
ErrEventSameName = 1551
ErrEventDataTooLong = 1552
ErrDropIndexFk = 1553
ErrDropIndexNeededInForeignKey = 1553
ErrWarnDeprecatedSyntaxWithVer = 1554
ErrCantWriteLockLogTable = 1555
ErrCantLockLogTable = 1556
Expand Down
2 changes: 1 addition & 1 deletion parser/mysql/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ var MySQLErrName = map[uint16]*ErrMessage{
ErrEventCompile: Message("Error during compilation of event's body", nil),
ErrEventSameName: Message("Same old and new event name", nil),
ErrEventDataTooLong: Message("Data for column '%s' too long", nil),
ErrDropIndexFk: Message("Cannot drop index '%-.192s': needed in a foreign key constraint", nil),
ErrDropIndexNeededInForeignKey: Message("Cannot drop index '%-.192s': needed in a foreign key constraint", nil),
ErrWarnDeprecatedSyntaxWithVer: Message("The syntax '%s' is deprecated and will be removed in MySQL %s. Please use %s instead", nil),
ErrCantWriteLockLogTable: Message("You can't write-lock a log table. Only read access is possible", nil),
ErrCantLockLogTable: Message("You can't use locks with log tables.", nil),
Expand Down
2 changes: 2 additions & 0 deletions util/dbterror/ddl_terror.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,8 @@ var (
// ErrUnsupportedTiFlashOperationForUnsupportedCharsetTable is used when alter alter tiflash related action(e.g. set tiflash mode, set tiflash replica) with unsupported charset.
ErrUnsupportedTiFlashOperationForUnsupportedCharsetTable = ClassDDL.NewStdErr(mysql.ErrUnsupportedDDLOperation, parser_mysql.Message(fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUnsupportedDDLOperation].Raw, "ALTER TiFlash settings for tables not supported by TiFlash: table contains %s charset"), nil))

// ErrDropIndexNeededInForeignKey returns when drop index which is needed in foreign key.
ErrDropIndexNeededInForeignKey = ClassDDL.NewStd(mysql.ErrDropIndexNeededInForeignKey)
// ErrForeignKeyCannotDropParent returns when drop table which has foreign key referred.
ErrForeignKeyCannotDropParent = ClassDDL.NewStd(mysql.ErrForeignKeyCannotDropParent)
// ErrTruncateIllegalForeignKey returns when truncate table which has foreign key referred.
Expand Down

0 comments on commit f80a42d

Please sign in to comment.