Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add table option pre_split_regions for pre-split region when create table with shard_row_id_bits. (#10138) #10863

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,14 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e

err = d.doDDLJob(ctx, job)
if err == nil {
// do pre-split and scatter.
if tbInfo.ShardRowIDBits > 0 && tbInfo.PreSplitRegions > 0 {
if ctx.GetSessionVars().WaitTableSplitFinish {
preSplitTableRegion(d.store, tbInfo, true)
} else {
go preSplitTableRegion(d.store, tbInfo, false)
}
}
if tbInfo.AutoIncID > 1 {
// Default tableAutoIncID base is 0.
// If the first ID is expected to greater than 1, we need to do rebase.
Expand Down Expand Up @@ -1247,9 +1255,13 @@ func handleTableOptions(options []*ast.TableOption, tbInfo *model.TableInfo) err
tbInfo.ShardRowIDBits = shardRowIDBitsMax
}
tbInfo.MaxShardRowIDBits = tbInfo.ShardRowIDBits
case ast.TableOptionPreSplitRegion:
tbInfo.PreSplitRegions = op.UintValue
}
}

if tbInfo.PreSplitRegions > tbInfo.ShardRowIDBits {
tbInfo.PreSplitRegions = tbInfo.ShardRowIDBits
}
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ func onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {

type splitableStore interface {
SplitRegion(splitKey kv.Key) error
SplitRegionAndScatter(splitKey kv.Key) (uint64, error)
WaitScatterRegionFinish(regionID uint64) error
}

func splitTableRegion(store kv.Storage, tableID int64) {
Expand All @@ -176,6 +178,73 @@ func splitTableRegion(store kv.Storage, tableID int64) {
}
}

func preSplitTableRegion(store kv.Storage, tblInfo *model.TableInfo, waitTableSplitFinish bool) {
s, ok := store.(splitableStore)
if !ok {
return
}
regionIDs := make([]uint64, 0, 1<<(tblInfo.PreSplitRegions-1)+len(tblInfo.Indices))

// Example:
// ShardRowIDBits = 5
// PreSplitRegions = 3
//
// then will pre-split 2^(3-1) = 4 regions.
//
// in this code:
// max = 1 << (tblInfo.ShardRowIDBits - 1) = 1 << (5-1) = 16
// step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions)) = 1 << (5-3) = 4;
//
// then split regionID is below:
// 4 << 59 = 2305843009213693952
// 8 << 59 = 4611686018427387904
// 12 << 59 = 6917529027641081856
//
// The 4 pre-split regions range is below:
// 0 ~ 2305843009213693952
// 2305843009213693952 ~ 4611686018427387904
// 4611686018427387904 ~ 6917529027641081856
// 6917529027641081856 ~ 9223372036854775807 ( (1 << 63) - 1 )
//
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
step := int64(1 << (tblInfo.ShardRowIDBits - tblInfo.PreSplitRegions))
// The highest bit is the symbol bit,and alloc _tidb_rowid will always be positive number.
// So we only need to split the region for the positive number.
max := int64(1 << (tblInfo.ShardRowIDBits - 1))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tblInfo.ShardRowIDBits)
recordPrefix := tablecodec.GenTableRecordPrefix(tblInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := s.SplitRegionAndScatter(key)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
// Split index region.
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := s.SplitRegionAndScatter(indexPrefix)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] pre split table index region failed", zap.String("index", idx.Name.L), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
}
if !waitTableSplitFinish {
return
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(ddlLogCtx).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
}
}

func getTable(store kv.Storage, schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
alloc := autoid.NewAllocator(store, tblInfo.GetDBID(schemaID), tblInfo.IsAutoIncColUnsigned())
tbl, err := table.TableFromMeta(alloc, tblInfo)
Expand Down
7 changes: 7 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ func (s *testSuite) TestSetVar(c *C) {
tk.MustQuery(`select @@session.tidb_constraint_check_in_place;`).Check(testkit.Rows("1"))
tk.MustExec("set global tidb_constraint_check_in_place = 0")
tk.MustQuery(`select @@global.tidb_constraint_check_in_place;`).Check(testkit.Rows("0"))

// test for tidb_wait_table_split_finish
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
tk.MustExec("set tidb_wait_table_split_finish = 1")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("1"))
tk.MustExec("set tidb_wait_table_split_finish = 0")
tk.MustQuery(`select @@session.tidb_wait_table_split_finish;`).Check(testkit.Rows("0"))
}

func (s *testSuite) TestSetCharset(c *C) {
Expand Down
6 changes: 5 additions & 1 deletion executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,11 @@ func (e *ShowExec) fetchShowCreateTable() error {
}

if tb.Meta().ShardRowIDBits > 0 {
buf.WriteString(fmt.Sprintf("/*!90000 SHARD_ROW_ID_BITS=%d */", tb.Meta().ShardRowIDBits))
fmt.Fprintf(&buf, "/*!90000 SHARD_ROW_ID_BITS=%d ", tb.Meta().ShardRowIDBits)
if tb.Meta().PreSplitRegions > 0 {
fmt.Fprintf(&buf, "PRE_SPLIT_REGIONS=%d ", tb.Meta().PreSplitRegions)
}
buf.WriteString("*/")
}

if len(tb.Meta().Comment) > 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,16 @@ func (s *testSuite) TestShowCreateTable(c *C) {
" PARTITION p11 VALUES LESS THAN (12),\n"+
" PARTITION p12 VALUES LESS THAN (MAXVALUE)\n"+
")"))

tk.MustExec("create table t (a int, b int) shard_row_id_bits = 4 pre_split_regions=3;")
tk.MustQuery("show create table `t`").Check(testutil.RowsWithSep("|",
""+
"t CREATE TABLE `t` (\n"+
" `a` int(11) DEFAULT NULL,\n"+
" `b` int(11) DEFAULT NULL\n"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
))
tk.MustExec("drop table t")
}

func (s *testSuite) TestShowEscape(c *C) {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ require (
github.com/pingcap/errors v0.11.1
github.com/pingcap/failpoint v0.0.0-20190430075617-bf45ab20bfc4
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11
github.com/pingcap/kvproto v0.0.0-20190429124202-32a5ba2af0f7
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190612082136-d4cf758dcb7f
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/parser v0.0.0-20190619113912-b634efe14a7a
github.com/pingcap/pd v2.1.12+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20180910045846-371b48b15d93
github.com/prometheus/client_golang v0.8.0
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,19 @@ github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030 h1:XJLuW0lsP7vAt
github.com/pingcap/goleveldb v0.0.0-20171020084629-8d44bfdf1030/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11 h1:e81flSfRbbMW5RUnz1cJl+8XKOVUCfF8FapFS8HnHLs=
github.com/pingcap/kvproto v0.0.0-20190226063853-f6c0b7ffff11/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20190429124202-32a5ba2af0f7 h1:+wEqJTc74Jvoxen3SEeT5NDgiSUXSEi6CxG2YzpmDJU=
github.com/pingcap/kvproto v0.0.0-20190429124202-32a5ba2af0f7/go.mod h1:0gwbe1F2iBIjuQ9AH0DbQhL+Dpr5GofU8fgYyXk+ykk=
github.com/pingcap/kvproto v0.0.0-20190619024611-a4759dfe3753 h1:92t0y430CJF0tN1lvUhP5fhnYTFmssATJqwxQtvixYU=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190612082136-d4cf758dcb7f h1:8uWFPaemhZ6CE3dr7hSziAYD+OZ0yuHtK1ziAiWn4sQ=
github.com/pingcap/parser v0.0.0-20190612082136-d4cf758dcb7f/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190619113912-b634efe14a7a h1:1TKjrAf3ddVOMjmQ4FY329DGPPjTJSiQL8lh/k51tqA=
github.com/pingcap/parser v0.0.0-20190619113912-b634efe14a7a/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/pd v2.1.12+incompatible h1:6N3LBxx2aSZqT+IWEG730EDNDttP7dXO8J6yvBh+HXw=
github.com/pingcap/pd v2.1.12+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible h1:e9Gi/LP9181HT3gBfSOeSBA+5JfemuE4aEAhqNgoE4k=
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20180910045846-371b48b15d93 h1:gI5bOzLMxjUq6ui+md/JnT4pYpkzrABJ/PeYORWiYYs=
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ type SessionVars struct {
// DDLReorgPriority is the operation priority of adding indices.
DDLReorgPriority int

// WaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
WaitTableSplitFinish bool

// EnableStreaming indicates whether the coprocessor request can use streaming API.
// TODO: remove this after tidb-server configuration "enable-streaming' removed.
EnableStreaming bool
Expand Down Expand Up @@ -643,6 +646,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
config.GetGlobalConfig().CheckMb4ValueInUTF8 = TiDBOptOn(val)
case TiDBSlowQueryFile:
s.SlowQueryFile = val
case TiDBWaitTableSplitFinish:
s.WaitTableSplitFinish = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)},
{ScopeSession, TiDBSlowQueryFile, ""},
{ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// TiDBWaitTableSplitFinish defines the create table pre-split behaviour is sync or async.
TiDBWaitTableSplitFinish = "tidb_wait_table_split_finish"

// tidb_force_priority defines the operations priority of all statements.
// It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED"
TiDBForcePriority = "tidb_force_priority"
Expand Down Expand Up @@ -258,6 +261,7 @@ const (
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
DefTiDBWaitTableSplitFinish = false
)

// Process global variables.
Expand Down
11 changes: 10 additions & 1 deletion store/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/client"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -96,7 +97,7 @@ func (c *pdClient) GetStore(ctx context.Context, storeID uint64) (*metapb.Store,
return store, nil
}

func (c *pdClient) GetAllStores(ctx context.Context) ([]*metapb.Store, error) {
func (c *pdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
panic("unimplemented")
}

Expand All @@ -106,3 +107,11 @@ func (c *pdClient) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uin

func (c *pdClient) Close() {
}

func (c *pdClient) ScatterRegion(ctx context.Context, regionID uint64) error {
return nil
}

func (c *pdClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) {
return &pdpb.GetOperatorResponse{Status: pdpb.OperatorStatus_SUCCESS}, nil
}
2 changes: 2 additions & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ const (
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
)

// CommitMaxBackoff is max sleep time of the 'commit' command
Expand Down
Loading