Skip to content

Commit

Permalink
ddl: add ddl job error count limit, exceed the limit should cancel th…
Browse files Browse the repository at this point in the history
…e ddl job
  • Loading branch information
crazycs520 committed Feb 13, 2019
1 parent 8431d11 commit 4948905
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
7 changes: 7 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/util/admin"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -537,10 +538,16 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

job.Error = toTError(err)
job.ErrorCount++
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > ddlErrorCountLimitCnt && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) == nil {
job.State = model.JobStateCancelling
}
}
return
}

const ddlErrorCountLimitCnt = 512

func toTError(err error) *terror.Error {
originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
Expand Down
11 changes: 11 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,14 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) {
tk.MustExec("insert into t_recover values (4),(5),(6)")
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
}

func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
gofail.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
_, err := tk.Exec("create table t (a int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// gofail: var mockExceedErrorLimit bool
// if mockExceedErrorLimit {
// return ver, errors.New("mock do job error")
// }

schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,23 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
func IsJobRollbackable(job *model.Job) error {
switch job.Type {
case model.ActionDropIndex:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization, otherwise will cause inconsistent between record and index.
if job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
}
case model.ActionDropColumn:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
}
}
return nil
Expand Down Expand Up @@ -135,7 +135,7 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
errs[i] = IsJobRollbackable(job)
if errs[i] != nil {
continue
}
Expand Down

0 comments on commit 4948905

Please sign in to comment.