diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 8356d59269ae2..ebb79c613df43 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -1303,7 +1303,9 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e var preSplitAndScatter func() // do pre-split and scatter. if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 { - preSplitAndScatter = func() { preSplitTableRegion(d.store, tbInfo, ctx.GetSessionVars().WaitTableSplitFinish) } + preSplitAndScatter = func() { + preSplitTableShardRowIDBitsRegion(d.store, tbInfo, ctx.GetSessionVars().WaitSplitRegionFinish) + } } else if atomic.LoadUint32(&EnableSplitTableRegion) != 0 { pi := tbInfo.GetPartitionInfo() if pi != nil { @@ -1313,7 +1315,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e } } if preSplitAndScatter != nil { - if ctx.GetSessionVars().WaitTableSplitFinish { + if ctx.GetSessionVars().WaitSplitRegionFinish { preSplitAndScatter() } else { go preSplitAndScatter() diff --git a/ddl/table.go b/ddl/table.go index 3232fa3ab8ae2..99ea39c80291c 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -365,7 +365,7 @@ func splitTableRegion(store kv.Storage, tableID int64) { } } -func preSplitTableRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) { +func preSplitTableShardRowIDBitsRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) { s, ok := store.(splitableStore) if !ok { return diff --git a/executor/executor_test.go b/executor/executor_test.go index 60aaa16226967..e93ba0621f831 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1953,6 +1953,26 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) { c.Assert(failpoint.Disable(step2), IsNil) } +func (s *testSuite) TestSplitRegionTimeout(c *C) { + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil) + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))") + tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`) + tk.MustExec(`set @@tidb_wait_split_region_timeout=1`) + _, err := tk.Exec(`split table t between (0) and (10000) regions 10`) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "split region timeout(1s)") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil) + _, err = tk.Exec(`split table t between (0) and (10000) regions 10`) + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)") + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil) +} + func (s *testSuite) TestRow(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/set_test.go b/executor/set_test.go index 54c680d1dce2f..b1a1445c43d2c 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -15,6 +15,7 @@ package executor_test import ( "context" + "strconv" . "github.com/pingcap/check" "github.com/pingcap/parser/terror" @@ -340,12 +341,21 @@ func (s *testSuite2) TestSetVar(c *C) { _, err = tk.Exec("set global read_only = abc") c.Assert(err, NotNil) - // test for tidb_wait_table_split_finish - tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0")) - tk.MustExec("set tidb_wait_table_split_finish = 1") - tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1")) - tk.MustExec("set tidb_wait_table_split_finish = 0") - tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0")) + // test for tidb_wait_split_region_finish + tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("1")) + tk.MustExec("set tidb_wait_split_region_finish = 1") + tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("1")) + tk.MustExec("set tidb_wait_split_region_finish = 0") + tk.MustQuery(`select @@session.tidb_wait_split_region_finish;`).Check(testkit.Rows("0")) + + // test for tidb_wait_split_region_timeout + tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows(strconv.Itoa(variable.DefWaitSplitRegionTimeout))) + tk.MustExec("set tidb_wait_split_region_timeout = 1") + tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1")) + _, err = tk.Exec("set tidb_wait_split_region_timeout = 0") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "tidb_wait_split_region_timeout(0) cannot be smaller than 1") + tk.MustQuery(`select @@session.tidb_wait_split_region_timeout;`).Check(testkit.Rows("1")) tk.MustExec("set session tidb_back_off_weight = 3") tk.MustQuery("select @@session.tidb_back_off_weight;").Check(testkit.Rows("3")) diff --git a/executor/split.go b/executor/split.go index ade0682692379..03fa8d428ea28 100644 --- a/executor/split.go +++ b/executor/split.go @@ -18,9 +18,11 @@ import ( "context" "encoding/binary" "math" + "time" "github.com/cznic/mathutil" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" @@ -60,6 +62,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { if err != nil { return err } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) + defer cancel() regionIDs := make([]uint64, 0, len(splitIdxKeys)) for _, idxKey := range splitIdxKeys { regionID, err := s.SplitRegionAndScatter(idxKey) @@ -72,8 +77,11 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { } regionIDs = append(regionIDs, regionID) + if isCtxDone(ctxWithTimeout) { + return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) + } } - if !e.ctx.GetSessionVars().WaitTableSplitFinish { + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } for _, regionID := range regionIDs { @@ -85,6 +93,9 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { zap.String("index", e.indexInfo.Name.L), zap.Error(err)) } + if isCtxDone(ctxWithTimeout) { + return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) + } } return nil } @@ -224,6 +235,10 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { if !ok { return nil } + + ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) + defer cancel() + splitKeys, err := e.getSplitTableKeys() if err != nil { return err @@ -238,8 +253,18 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { continue } regionIDs = append(regionIDs, regionID) + + failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second * 1) + } + }) + + if isCtxDone(ctxWithTimeout) { + return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) + } } - if !e.ctx.GetSessionVars().WaitTableSplitFinish { + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } for _, regionID := range regionIDs { @@ -250,10 +275,29 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error { zap.String("table", e.tableInfo.Name.L), zap.Error(err)) } + + failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second * 1) + } + }) + + if isCtxDone(ctxWithTimeout) { + return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout()) + } } return nil } +func isCtxDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} + var minRegionStepValue = uint64(1000) func (e *SplitTableRegionExec) getSplitTableKeys() ([][]byte, error) { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 045445b9300aa..028c5b6c79c09 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -342,8 +342,11 @@ type SessionVars struct { // DDLReorgPriority is the operation priority of adding indices. DDLReorgPriority int - // WaitTableSplitFinish defines the create table pre-split behaviour is sync or async. - WaitTableSplitFinish bool + // WaitSplitRegionFinish defines the split region behaviour is sync or async. + WaitSplitRegionFinish bool + + // WaitSplitRegionTimeout defines the split region timeout. + WaitSplitRegionTimeout uint64 // EnableStreaming indicates whether the coprocessor request can use streaming API. // TODO: remove this after tidb-server configuration "enable-streaming' removed. @@ -435,6 +438,8 @@ func NewSessionVars() *SessionVars { CommandValue: uint32(mysql.ComSleep), TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold, SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, + WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, + WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -479,6 +484,11 @@ func (s *SessionVars) GetWriteStmtBufs() *WriteStmtBufs { return &s.writeStmtBufs } +// GetSplitRegionTimeout gets split region timeout. +func (s *SessionVars) GetSplitRegionTimeout() time.Duration { + return time.Duration(s.WaitSplitRegionTimeout) * time.Second +} + // CleanBuffers cleans the temporary bufs func (s *SessionVars) CleanBuffers() { if !s.LightningMode { @@ -798,8 +808,10 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.SlowQueryFile = val case TiDBEnableFastAnalyze: s.EnableFastAnalyze = TiDBOptOn(val) - case TiDBWaitTableSplitFinish: - s.WaitTableSplitFinish = TiDBOptOn(val) + case TiDBWaitSplitRegionFinish: + s.WaitSplitRegionFinish = TiDBOptOn(val) + case TiDBWaitSplitRegionTimeout: + s.WaitSplitRegionTimeout = uint64(tidbOptPositiveInt32(val, DefWaitSplitRegionTimeout)) case TiDBExpensiveQueryTimeThreshold: atomic.StoreUint64(&ExpensiveQueryTimeThreshold, uint64(tidbOptPositiveInt32(val, DefTiDBExpensiveQueryTimeThreshold))) case TiDBTxnMode: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index fdbc116d2c3ef..e7b4bfb6d5d15 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -695,7 +695,8 @@ var defaultSysVars = []*SysVar{ {ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)}, {ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)}, {ScopeSession, TiDBSlowQueryFile, ""}, - {ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)}, + {ScopeSession, TiDBWaitSplitRegionFinish, BoolToIntStr(DefTiDBWaitSplitRegionFinish)}, + {ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)}, {ScopeSession, TiDBLowResolutionTSO, "0"}, {ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)}, } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 5df9769b845ef..8ebd523d26e82 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -244,8 +244,11 @@ const ( // It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH TiDBDDLReorgPriority = "tidb_ddl_reorg_priority" - // TiDBWaitTableSplitFinish defines the create table pre-split behaviour is sync or async. - TiDBWaitTableSplitFinish = "tidb_wait_table_split_finish" + // TiDBWaitSplitRegionFinish defines the split region behaviour is sync or async. + TiDBWaitSplitRegionFinish = "tidb_wait_split_region_finish" + + // TiDBWaitSplitRegionTimeout uses to set the split and scatter region back off time. + TiDBWaitSplitRegionTimeout = "tidb_wait_split_region_timeout" // tidb_force_priority defines the operations priority of all statements. // It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED" @@ -333,8 +336,9 @@ const ( DefTiDBDDLSlowOprThreshold = 300 DefTiDBUseFastAnalyze = false DefTiDBSkipIsolationLevelCheck = false - DefTiDBWaitTableSplitFinish = false - DefTiDBExpensiveQueryTimeThreshold = 60 // 60s + DefTiDBWaitSplitRegionFinish = true + DefTiDBExpensiveQueryTimeThreshold = 60 // 60s + DefWaitSplitRegionTimeout = 300 // 300s ) // Process global variables. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 316766287b3a1..2207ce3060ac5 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -550,6 +550,14 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, if v < 0 || v >= 64 { return value, errors.Errorf("tidb_join_order_algo_threshold(%d) cannot be smaller than 0 or larger than 63", v) } + case TiDBWaitSplitRegionTimeout: + v, err := strconv.Atoi(value) + if err != nil { + return value, ErrWrongTypeForVar.GenWithStackByArgs(name) + } + if v <= 0 { + return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v) + } } return value, nil }