Skip to content

Commit

Permalink
variable: remove tidb_ddl_reorg_worker_cnt and tidb_ddl_reorg_batch_s…
Browse files Browse the repository at this point in the history
…ize session level. (#8941)
  • Loading branch information
crazycs520 authored and jackysp committed Jan 10, 2019
1 parent 3b40b14 commit 858b200
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 33 deletions.
7 changes: 5 additions & 2 deletions ddl/failtest/fail_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/testutil"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -342,11 +343,13 @@ func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) {
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)

err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := originDDLAddIndexWorkerCnt
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
ddl.TestCheckWorkerNumber = lastSetWorkerCnt
defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt)
defer tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", originDDLAddIndexWorkerCnt))

gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`)
defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum")
Expand All @@ -364,7 +367,7 @@ LOOP:
c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err)))
case <-ddl.TestCheckWorkerNumCh:
lastSetWorkerCnt = int32(rand.Intn(8) + 8)
tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt)
checkNum++
}
Expand Down
15 changes: 15 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -1080,6 +1081,17 @@ var (
TestCheckWorkerNumber = int32(16)
)

func loadDDLReorgVars(w *worker) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return ddlutil.LoadDDLReorgVars(ctx)
}

// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -1120,6 +1132,9 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I
}

// For dynamic adjust add index worker number.
if err := loadDDLReorgVars(w); err != nil {
log.Error(err)
}
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
Expand Down
21 changes: 21 additions & 0 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)
Expand Down Expand Up @@ -128,3 +129,23 @@ func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, old
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

const loadDDLReorgVarsSQL = "select HIGH_PRIORITY variable_name, variable_value from mysql.global_variables where variable_name in ('" +
variable.TiDBDDLReorgWorkerCount + "', '" +
variable.TiDBDDLReorgBatchSize + "')"

// LoadDDLReorgVars loads ddl reorg variable from mysql.global_variables.
func LoadDDLReorgVars(ctx sessionctx.Context) error {
if sctx, ok := ctx.(sqlexec.RestrictedSQLExecutor); ok {
rows, _, err := sctx.ExecRestrictedSQL(ctx, loadDDLReorgVarsSQL)
if err != nil {
return errors.Trace(err)
}
for _, row := range rows {
varName := row.GetString(0)
varValue := row.GetString(1)
variable.SetLocalSystemVar(varName, varValue)
}
}
return nil
}
52 changes: 36 additions & 16 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/meta/autoid"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -510,24 +511,32 @@ func (s *testSuite3) TestMaxHandleAddIndex(c *C) {
func (s *testSuite3) TestSetDDLReorgWorkerCnt(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
err := ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(variable.DefTiDBDDLReorgWorkerCount))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 1")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1")
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(1))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100))
_, err := tk.Exec("set tidb_ddl_reorg_worker_cnt = invalid_val")
_, err = tk.Exec("set @@global.tidb_ddl_reorg_worker_cnt = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgWorkerCounter(), Equals, int32(100))
_, err = tk.Exec("set tidb_ddl_reorg_worker_cnt = -1")
_, err = tk.Exec("set @@global.tidb_ddl_reorg_worker_cnt = -1")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongValueForVar), IsTrue, Commentf("err %v", err))

tk.MustExec("set tidb_ddl_reorg_worker_cnt = 100")
res := tk.MustQuery("select @@tidb_ddl_reorg_worker_cnt")
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
res := tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgWorkerCount)))
res.Check(testkit.Rows("100"))
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 100")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_worker_cnt")
res.Check(testkit.Rows("100"))
Expand All @@ -536,28 +545,39 @@ func (s *testSuite3) TestSetDDLReorgWorkerCnt(c *C) {
func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
err := ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.DefTiDBDDLReorgBatchSize))

tk.MustExec("set tidb_ddl_reorg_batch_size = 1")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '1'"))
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MinDDLReorgBatchSize))
tk.MustExec(fmt.Sprintf("set tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1))
tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_batch_size = %v", variable.MaxDDLReorgBatchSize+1))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '%d'", variable.MaxDDLReorgBatchSize+1)))
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(variable.MaxDDLReorgBatchSize))
_, err := tk.Exec("set tidb_ddl_reorg_batch_size = invalid_val")
_, err = tk.Exec("set @@global.tidb_ddl_reorg_batch_size = invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))
tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 100")
err = ddlutil.LoadDDLReorgVars(tk.Se)
c.Assert(err, IsNil)
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100))
tk.MustExec("set tidb_ddl_reorg_batch_size = -1")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_ddl_reorg_batch_size value: '-1'"))

tk.MustExec("set tidb_ddl_reorg_batch_size = 100")
res := tk.MustQuery("select @@tidb_ddl_reorg_batch_size")
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 100")
res := tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("100"))

res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows(fmt.Sprintf("%v", variable.DefTiDBDDLReorgBatchSize)))
res.Check(testkit.Rows(fmt.Sprintf("%v", 100)))
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000")
res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
res.Check(testkit.Rows("1000"))

// If do not LoadDDLReorgVars, the local variable will be the last loaded value.
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100))
}
7 changes: 2 additions & 5 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,11 +825,10 @@ func (s *session) SetGlobalSysVar(name, value string) error {
if err != nil {
return errors.Trace(err)
}

name = strings.ToLower(name)
sql := fmt.Sprintf(`REPLACE %s.%s VALUES ('%s', '%s');`,
mysql.SystemDB, mysql.GlobalVariablesTable, strings.ToLower(name), sVal)
mysql.SystemDB, mysql.GlobalVariablesTable, name, sVal)
_, _, err = s.ExecRestrictedSQL(s, sql)

return errors.Trace(err)
}

Expand Down Expand Up @@ -1450,8 +1449,6 @@ const loadCommonGlobalVarsSQL = "select HIGH_PRIORITY * from mysql.global_variab
variable.TiDBHashAggFinalConcurrency + quoteCommaQuote +
variable.TiDBBackoffLockFast + quoteCommaQuote +
variable.TiDBConstraintCheckInPlace + quoteCommaQuote +
variable.TiDBDDLReorgWorkerCount + quoteCommaQuote +
variable.TiDBDDLReorgBatchSize + quoteCommaQuote +
variable.TiDBOptInSubqToJoinAndAgg + quoteCommaQuote +
variable.TiDBDistSQLScanConcurrency + quoteCommaQuote +
variable.TiDBInitChunkSize + quoteCommaQuote +
Expand Down
14 changes: 10 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,10 +681,6 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.OptimizerSelectivityLevel = tidbOptPositiveInt32(val, DefTiDBOptimizerSelectivityLevel)
case TiDBEnableTablePartition:
s.EnableTablePartition = val
case TiDBDDLReorgWorkerCount:
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
case TiDBDDLReorgPriority:
s.setDDLReorgPriority(val)
case TiDBForcePriority:
Expand All @@ -698,6 +694,16 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
return nil
}

// SetLocalSystemVar sets values of the local variables which in "server" scope.
func SetLocalSystemVar(name string, val string) {
switch name {
case TiDBDDLReorgWorkerCount:
SetDDLReorgWorkerCounter(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgWorkerCount)))
case TiDBDDLReorgBatchSize:
SetDDLReorgBatchSize(int32(tidbOptPositiveInt32(val, DefTiDBDDLReorgBatchSize)))
}
}

// special session variables.
const (
SQLModeVar = "sql_mode"
Expand Down
4 changes: 2 additions & 2 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,8 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBSlowLogThreshold, strconv.Itoa(logutil.DefaultSlowThreshold)},
{ScopeSession, TiDBQueryLogMaxLen, strconv.Itoa(logutil.DefaultQueryLogMaxLen)},
{ScopeSession, TiDBConfig, ""},
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal | ScopeSession, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeGlobal, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)},
Expand Down
4 changes: 0 additions & 4 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
SetSessionSystemVar(v, TiDBOptimizerSelectivityLevel, types.NewIntDatum(1))
c.Assert(v.OptimizerSelectivityLevel, Equals, 1)

c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(DefTiDBDDLReorgWorkerCount))
SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(1))
c.Assert(GetDDLReorgWorkerCounter(), Equals, int32(1))

err = SetSessionSystemVar(v, TiDBDDLReorgWorkerCount, types.NewIntDatum(-1))
c.Assert(terror.ErrorEqual(err, ErrWrongValueForVar), IsTrue)

Expand Down

0 comments on commit 858b200

Please sign in to comment.