Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: rename and add tidb_wait_split_region_timeout session variable to set wait split region timeout. (#10797) #11166

Merged
merged 1 commit into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
var preSplitAndScatter func()
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
preSplitAndScatter = func() { preSplitTableRegion(d.store, tbInfo, ctx.GetSessionVars().WaitTableSplitFinish) }
preSplitAndScatter = func() {
preSplitTableShardRowIDBitsRegion(d.store, tbInfo, ctx.GetSessionVars().WaitSplitRegionFinish)
}
} else if atomic.LoadUint32(&EnableSplitTableRegion) != 0 {
pi := tbInfo.GetPartitionInfo()
if pi != nil {
Expand All @@ -1313,7 +1315,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
}
}
if preSplitAndScatter != nil {
if ctx.GetSessionVars().WaitTableSplitFinish {
if ctx.GetSessionVars().WaitSplitRegionFinish {
preSplitAndScatter()
} else {
go preSplitAndScatter()
Expand Down
2 changes: 1 addition & 1 deletion ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func splitTableRegion(store kv.Storage, tableID int64) {
}
}

func preSplitTableRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
Expand Down
20 changes: 20 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,26 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) {
c.Assert(failpoint.Disable(step2), IsNil)
}

func (s *testSuite) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "split region timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
22 changes: 16 additions & 6 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor_test

import (
"context"
"strconv"

. "github.com/pingcap/check"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -340,12 +341,21 @@ func (s *testSuite2) TestSetVar(c *C) {
_, err = tk.Exec("set global read_only = abc")
c.Assert(err, NotNil)

// test for tidb_wait_table_split_finish
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
tk.MustExec("set tidb_wait_table_split_finish = 1")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
// test for tidb_wait_split_region_finish
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_split_region_finish = 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_split_region_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0"))

// test for tidb_wait_split_region_timeout
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows(strconv.Itoa(variable.DefWaitSplitRegionTimeout)))
tk.MustExec("set tidb_wait_split_region_timeout = 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1"))
_, err = tk.Exec("set tidb_wait_split_region_timeout = 0")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "tidb_wait_split_region_timeout(0) cannot be smaller than 1")
tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1"))

tk.MustExec("set session tidb_back_off_weight = 3")
tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3"))
Expand Down
48 changes: 46 additions & 2 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"encoding/binary"
"math"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -60,6 +62,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
regionID, err := s.SplitRegionAndScatter(idxKey)
Expand All @@ -72,8 +77,11 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs = append(regionIDs, regionID)

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
if !e.ctx.GetSessionVars().WaitTableSplitFinish {
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
Expand All @@ -85,6 +93,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
}
Expand Down Expand Up @@ -224,6 +235,10 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
if !ok {
return nil
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

splitKeys, err := e.getSplitTableKeys()
if err != nil {
return err
Expand All @@ -238,8 +253,18 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
continue
}
regionIDs = append(regionIDs, regionID)

failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
if !e.ctx.GetSessionVars().WaitTableSplitFinish {
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
Expand All @@ -250,10 +275,29 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}

failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
}

func isCtxDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

var minRegionStepValue = uint64(1000)

func (e *SplitTableRegionExec) getSplitTableKeys() ([][]byte, error) {
Expand Down
20 changes: 16 additions & 4 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,11 @@ type SessionVars struct {
// DDLReorgPriority is the operation priority of adding indices.
DDLReorgPriority int

// WaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
WaitTableSplitFinish bool
// WaitSplitRegionFinish defines the split region behaviour is sync or async.
WaitSplitRegionFinish bool

// WaitSplitRegionTimeout defines the split region timeout.
WaitSplitRegionTimeout uint64

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
Expand Down Expand Up @@ -435,6 +438,8 @@ func NewSessionVars() *SessionVars {
CommandValue: uint32(mysql.ComSleep),
TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold,
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -479,6 +484,11 @@ func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs {
return &s.writeStmtBufs
}

// GetSplitRegionTimeout gets split region timeout.
func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
if !s.LightningMode {
Expand Down Expand Up @@ -798,8 +808,10 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.SlowQueryFile = val
case TiDBEnableFastAnalyze:
s.EnableFastAnalyze = TiDBOptOn(val)
case TiDBWaitTableSplitFinish:
s.WaitTableSplitFinish = TiDBOptOn(val)
case TiDBWaitSplitRegionFinish:
s.WaitSplitRegionFinish = TiDBOptOn(val)
case TiDBWaitSplitRegionTimeout:
s.WaitSplitRegionTimeout = uint64(tidbOptPositiveInt32(val, DefWaitSplitRegionTimeout))
case TiDBExpensiveQueryTimeThreshold:
atomic.StoreUint64(&ExpensiveQueryTimeThreshold, uint64(tidbOptPositiveInt32(val, DefTiDBExpensiveQueryTimeThreshold)))
case TiDBTxnMode:
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,8 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)},
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)},
{ScopeSession, TiDBWaitSplitRegionFinish, BoolToIntStr(DefTiDBWaitSplitRegionFinish)},
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
}
Expand Down
12 changes: 8 additions & 4 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,11 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// TiDBWaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
TiDBWaitTableSplitFinish = "tidb_wait_table_split_finish"
// TiDBWaitSplitRegionFinish defines the split region behaviour is sync or async.
TiDBWaitSplitRegionFinish = "tidb_wait_split_region_finish"

// TiDBWaitSplitRegionTimeout uses to set the split and scatter region back off time.
TiDBWaitSplitRegionTimeout = "tidb_wait_split_region_timeout"

// tidb_force_priority defines the operations priority of all statements.
// It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED"
Expand Down Expand Up @@ -333,8 +336,9 @@ const (
DefTiDBDDLSlowOprThreshold = 300
DefTiDBUseFastAnalyze = false
DefTiDBSkipIsolationLevelCheck = false
DefTiDBWaitTableSplitFinish = false
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefTiDBWaitSplitRegionFinish = true
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefWaitSplitRegionTimeout = 300 // 300s
)

// Process global variables.
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,14 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
if v < 0 || v >= 64 {
return value, errors.Errorf("tidb_join_order_algo_threshold(%d) cannot be smaller than 0 or larger than 63", v)
}
case TiDBWaitSplitRegionTimeout:
v, err := strconv.Atoi(value)
if err != nil {
return value, ErrWrongTypeForVar.GenWithStackByArgs(name)
}
if v <= 0 {
return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v)
}
}
return value, nil
}
Expand Down