Skip to content

Commit

Permalink
*: rename and add tidb_wait_split_region_timeout session variable to …
Browse files Browse the repository at this point in the history
…set wait split region timeout. (pingcap#10797)
  • Loading branch information
crazycs520 committed Jun 20, 2019
1 parent 380049e commit af8bb6b
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 20 deletions.
6 changes: 3 additions & 3 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,10 +1171,10 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
if err == nil {
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
if ctx.GetSessionVars().WaitTableSplitFinish {
preSplitTableRegion(d.store, tbInfo, true)
if ctx.GetSessionVars().WaitSplitRegionFinish {
preSplitTableShardRowIDBitsRegion(d.store, tbInfo, true)
} else {
go preSplitTableRegion(d.store, tbInfo, false)
go preSplitTableShardRowIDBitsRegion(d.store, tbInfo, false)
}
}
if tbInfo.AutoIncID > 1 {
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func splitTableRegion(store kv.Storage, tableID int64) {
}
}

func preSplitTableRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
Expand Down
20 changes: 20 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,26 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) {
c.Assert(failpoint.Disable(step2), IsNil)
}

func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "split region timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
23 changes: 17 additions & 6 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor_test

import (
"strconv"

. "github.com/pingcap/check"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -258,12 +260,21 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustExec("set global tidb_constraint_check_in_place = 0")
tk.MustQuery(`select @@global.tidb_constraint_check_in_place;`).Check(testkit.Rows("0"))

// test for tidb_wait_table_split_finish
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
tk.MustExec("set tidb_wait_table_split_finish = 1")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
// test for tidb_wait_split_region_finish
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0"))
tk.MustExec("set tidb_wait_split_region_finish = 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_split_region_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0"))

// test for tidb_wait_split_region_timeout
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows(strconv.Itoa(variable.DefWaitSplitRegionTimeout)))
tk.MustExec("set tidb_wait_split_region_timeout = 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1"))
_, err = tk.Exec("set tidb_wait_split_region_timeout = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "tidb_wait_split_region_timeout(0) cannot be smaller than 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1"))
}

func (s *testSuite) TestSetCharset(c *C) {
Expand Down
47 changes: 45 additions & 2 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"encoding/binary"
"math"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -60,6 +62,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
regionID, err := s.SplitRegionAndScatter(idxKey)
Expand All @@ -72,8 +77,11 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs = append(regionIDs, regionID)

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
if !e.ctx.GetSessionVars().WaitTableSplitFinish {
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
Expand All @@ -85,6 +93,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
}
Expand Down Expand Up @@ -228,6 +239,10 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
regionID, err := s.SplitRegionAndScatter(key)
Expand All @@ -238,8 +253,18 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
continue
}
regionIDs = append(regionIDs, regionID)

failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
if !e.ctx.GetSessionVars().WaitTableSplitFinish {
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
Expand All @@ -250,10 +275,28 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
}

func isCtxDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

var minRegionStepValue = uint64(1000)

func (e *SplitTableRegionExec) getSplitTableKeys() ([][]byte, error) {
Expand Down
20 changes: 16 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,11 @@ type SessionVars struct {
// DDLReorgPriority is the operation priority of adding indices.
DDLReorgPriority int

// WaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
WaitTableSplitFinish bool
// WaitSplitRegionFinish defines the create table pre-split behaviour is sync or async.
WaitSplitRegionFinish bool

// WaitSplitRegionTimeout defines the split region timeout.
WaitSplitRegionTimeout uint64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
Expand Down Expand Up @@ -367,6 +370,8 @@ func NewSessionVars() *SessionVars {
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -417,6 +422,11 @@ func (s *SessionVars) CleanBuffers() {
}
}

// GetSplitRegionTimeout gets split region timeout.
func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// AllocPlanColumnID allocates column id for plan.
func (s *SessionVars) AllocPlanColumnID() int64 {
s.PlanColumnID++
Expand Down Expand Up @@ -649,8 +659,10 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
config.GetGlobalConfig().CheckMb4ValueInUTF8 = TiDBOptOn(val)
case TiDBSlowQueryFile:
s.SlowQueryFile = val
case TiDBWaitTableSplitFinish:
s.WaitTableSplitFinish = TiDBOptOn(val)
case TiDBWaitSplitRegionFinish:
s.WaitSplitRegionFinish = TiDBOptOn(val)
case TiDBWaitSplitRegionTimeout:
s.WaitSplitRegionTimeout = uint64(tidbOptPositiveInt32(val, DefWaitSplitRegionTimeout))
}
s.systems[name] = val
return nil
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,8 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)},
{ScopeSession, TiDBWaitSplitRegionFinish, BoolToIntStr(DefTiDBWaitSplitRegionFinish)},
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
10 changes: 7 additions & 3 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,11 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// TiDBWaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
TiDBWaitTableSplitFinish = "tidb_wait_table_split_finish"
// TiDBWaitSplitRegionFinish defines the create table pre-split behaviour is sync or async.
TiDBWaitSplitRegionFinish = "tidb_wait_table_split_finish"

// TiDBWaitSplitRegionTimeout uses to set the split and scatter region back off time.
TiDBWaitSplitRegionTimeout = "tidb_wait_split_region_timeout"

// tidb_force_priority defines the operations priority of all statements.
// It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED"
Expand Down Expand Up @@ -261,7 +264,8 @@ const (
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
DefTiDBWaitTableSplitFinish = false
DefTiDBWaitSplitRegionFinish = true
DefWaitSplitRegionTimeout = 300 // 300s
)

// Process global variables.
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return "", ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return upVal, nil
case TiDBWaitSplitRegionTimeout:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v <= 0 {
return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v)
}
}
return value, nil
}
Expand Down

0 comments on commit af8bb6b

Please sign in to comment.