Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add ddl job error count limit, exceed the limit should cancel the ddl job #9295

Merged
merged 14 commits into from
Feb 27, 2019
Merged
23 changes: 23 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/admin"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -542,10 +545,30 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

job.Error = toTError(err)
job.ErrorCount++
// Load global ddl variables.
if err1 := loadDDLVars(w); err1 != nil {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("[ddl-%s] load ddl global variable error: %v", w, err1)
}
// Check error limit to avoid falling into an infinite loop.
if job.ErrorCount > variable.GetDDLErrorCountLimit() && job.State == model.JobStateRunning && admin.IsJobRollbackable(job) {
log.Warnf("[ddl-%s] the job id %v error count exceed the limit: %v, cancelling it now", w, job.ID, variable.GetDDLErrorCountLimit())
job.State = model.JobStateCancelling
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return
}

func loadDDLVars(w *worker) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return util.LoadDDLVars(ctx)
}

func toTError(err error) *terror.Error {
originErr := errors.Cause(err)
tErr, ok := originErr.(*terror.Error)
Expand Down
11 changes: 11 additions & 0 deletions ddl/serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,14 @@ func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) {
tk.MustExec("insert into t_recover values (4),(5),(6)")
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
}

func (s *testSerialSuite) TestCancelJobByErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
gofail.Enable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/mockExceedErrorLimit")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
_, err := tk.Exec("create table t (a int)")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
}
5 changes: 5 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
)

func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// gofail: var mockExceedErrorLimit bool
// if mockExceedErrorLimit {
// return ver, errors.New("mock do job error")
// }

schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
Expand Down
26 changes: 21 additions & 5 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,30 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old
return errors.Trace(err)
}

const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" +
variable.TiDBDDLReorgWorkerCount + "', '" +
variable.TiDBDDLReorgBatchSize + "')"

// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables.
func LoadDDLReorgVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLReorgWorkerCount, variable.TiDBDDLReorgBatchSize})
}

// LoadDDLVars loads ddl variable from mysql.global_variables.
func LoadDDLVars(ctx sessionctx.Context) error {
return LoadGlobalVars(ctx, []string{variable.TiDBDDLErrorCountLimit})
}

const loadGlobalVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in (%s)"

// LoadGlobalVars loads global variable from mysql.global_variables.
func LoadGlobalVars(ctx sessionctx.Context, varNames []string) error {
if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL)
nameList := ""
for i, name := range varNames {
if i > 0 {
nameList += ", "
}
nameList += fmt.Sprintf("'%s'", name)
}
sql := fmt.Sprintf(loadGlobalVarsSQL, nameList)
rows, _, err := sctx.ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
Expand Down
27 changes: 27 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,33 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) {
res.Check(testkit.Rows("1000"))
}

func (s *testSuite3) TestSetDDLErrorCountLimit(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
err := ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(variable.DefTiDBDDLErrorCountLimit))

tk.MustExec("set @@global.tidb_ddl_error_count_limit = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '-1'"))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(0))
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_error_count_limit = %v", uint64(math.MaxInt64)+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_error_count_limit value: '%d'", uint64(math.MaxInt64)+1)))
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(math.MaxInt64))
_, err = tk.Exec("set @@global.tidb_ddl_error_count_limit = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set @@global.tidb_ddl_error_count_limit = 100")
err = ddlutil.LoadDDLVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLErrorCountLimit(), Equals, int64(100))
res := tk.MustQuery("select @@global.tidb_ddl_error_count_limit")
res.Check(testkit.Rows("100"))
}

// Test issue #9205, fix the precision problem for time type default values
// See https://github.com/pingcap/tidb/issues/9205 for details
func (s *testSuite3) TestIssue9205(c *C) {
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,7 @@ var builtinGlobalVariable = []string{
variable.TiDBConstraintCheckInPlace,
variable.TiDBDDLReorgWorkerCount,
variable.TiDBDDLReorgBatchSize,
variable.TiDBDDLErrorCountLimit,
variable.TiDBOptInSubqToJoinAndAgg,
variable.TiDBDistSQLScanConcurrency,
variable.TiDBInitChunkSize,
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func SetLocalSystemVar(name string, val string) {
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLErrorCountLimit:
SetDDLErrorCountLimit(tidbOptInt64(val, DefTiDBDDLErrorCountLimit))
}
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ const (
// tidb_ddl_reorg_batch_size defines the transaction batch size of ddl reorg workers.
TiDBDDLReorgBatchSize = "tidb_ddl_reorg_batch_size"

// tidb_ddl_error_count_limit defines the count of ddl error limit.
TiDBDDLErrorCountLimit = "tidb_ddl_error_count_limit"

// tidb_ddl_reorg_priority defines the operations priority of adding indices.
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"
Expand Down Expand Up @@ -276,6 +279,7 @@ const (
DefTiDBOptimizerSelectivityLevel = 0
DefTiDBDDLReorgWorkerCount = 16
DefTiDBDDLReorgBatchSize = 1024
DefTiDBDDLErrorCountLimit = 512
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
Expand All @@ -289,6 +293,7 @@ var (
ddlReorgWorkerCounter int32 = DefTiDBDDLReorgWorkerCount
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func GetDDLReorgBatchSize() int32 {
return atomic.LoadInt32(&ddlReorgBatchSize)
}

// SetDDLErrorCountLimit sets ddlErrorCountlimit size.
func SetDDLErrorCountLimit(cnt int64) {
atomic.StoreInt64(&ddlErrorCountlimit, cnt)
}

// GetDDLErrorCountLimit gets ddlErrorCountlimit size.
func GetDDLErrorCountLimit() int64 {
return atomic.LoadInt64(&ddlErrorCountlimit)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -363,6 +373,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBDDLReorgBatchSize:
return checkUInt64SystemVar(name, value, uint64(MinDDLReorgBatchSize), uint64(MaxDDLReorgBatchSize), vars)
case TiDBDDLErrorCountLimit:
return checkUInt64SystemVar(name, value, uint64(0), math.MaxInt64, vars)
case TiDBIndexLookupConcurrency, TiDBIndexLookupJoinConcurrency, TiDBIndexJoinBatchSize,
TiDBIndexLookupSize,
TiDBHashJoinConcurrency,
Expand Down
15 changes: 8 additions & 7 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,31 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
return info, nil
}

func isJobRollbackable(job *model.Job, id int64) error {
// IsJobRollbackable checks whether the job can be rollback.
func IsJobRollbackable(job *model.Job) bool {
switch job.Type {
case model.ActionDropIndex:
// We can't cancel if index current state is in StateDeleteOnly or StateDeleteReorganization, otherwise will cause inconsistent between record and index.
if job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteOnly {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
case model.ActionDropColumn, model.ActionModifyColumn,
model.ActionDropTablePartition, model.ActionAddTablePartition,
model.ActionRebaseAutoID, model.ActionShardRowID,
model.ActionTruncateTable, model.ActionAddForeignKey,
model.ActionDropForeignKey:
if job.SchemaState != model.StateNone {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
return false
}
}
return nil
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems some DDL jobs don't support rollback, and the function returns true. Do we need to wait for other DDL jobs support rollback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly ddl rollback is supported.

}

// CancelJobs cancels the DDL jobs.
Expand Down Expand Up @@ -139,8 +140,8 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
if job.IsCancelled() || job.IsRollingback() || job.IsRollbackDone() {
continue
}
errs[i] = isJobRollbackable(job, id)
if errs[i] != nil {
if !IsJobRollbackable(job) {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}

Expand Down