Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamic adjust add index worker number. #8295

Merged
merged 28 commits into from
Dec 24, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a5c7024
ddl: dynamic adjust add index worker number init
crazycs520 Nov 13, 2018
00ad984
shrink worker num
crazycs520 Nov 14, 2018
f936c59
add index
crazycs520 Nov 14, 2018
c7fd5ee
refine test
crazycs520 Nov 14, 2018
86d6b99
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 14, 2018
b1cbd7c
refine test
crazycs520 Nov 14, 2018
fe27282
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 15, 2018
4545ebc
shrink worker num if regions is less then worker num
crazycs520 Nov 19, 2018
e05f9b7
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Nov 19, 2018
3033df1
add comment
crazycs520 Nov 20, 2018
d11e016
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 5, 2018
fb85c9b
add test to check change add index worker num take effect.
crazycs520 Dec 5, 2018
a2bdc3e
refine test
crazycs520 Dec 5, 2018
f2aa994
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 6, 2018
73cb88b
add log
crazycs520 Dec 7, 2018
cd97603
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 7, 2018
d6c57b0
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 12, 2018
c594dda
address comment
crazycs520 Dec 17, 2018
8d90b29
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 17, 2018
216c58f
fix test
crazycs520 Dec 17, 2018
451a9da
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
77f49bd
use gofail test
crazycs520 Dec 21, 2018
dfa3e4d
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
d60f065
refine gofail test and remove hook
crazycs520 Dec 21, 2018
50470ad
refine test
crazycs520 Dec 21, 2018
fc6674e
Merge branch 'master' of https://github.com/pingcap/tidb into adjust-…
crazycs520 Dec 21, 2018
8b2ab52
add comment
crazycs520 Dec 22, 2018
34eafec
Merge branch 'master' into adjust-add-index-worker
crazycs520 Dec 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Callback interface {
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnIndexWorkerReorgBefore is called before add index worker running reorganization.
OnIndexWorkerReorgBefore(workerNum, rangesNum int)
}

// BaseCallback implements Callback.OnChanged interface.
Expand All @@ -70,3 +72,8 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) {
func (c *BaseCallback) OnWatched(ctx context.Context) {
// Nothing to do.
}

// OnIndexWorkerReorgBefore implements Callback.OnIndexWorkerReorgBefore interface.
func (c *BaseCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) {
// Nothing to do.
}
20 changes: 15 additions & 5 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
type TestDDLCallback struct {
*BaseCallback

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
OnIndexWorkerReorgBeforeExported func(workerNum, rangesNum int)
}

func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
Expand Down Expand Up @@ -84,6 +85,15 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
tc.BaseCallback.OnWatched(ctx)
}

func (tc *TestDDLCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) {
if tc.OnIndexWorkerReorgBeforeExported != nil {
tc.OnIndexWorkerReorgBeforeExported(workerNum, rangesNum)
return
}

tc.BaseCallback.OnIndexWorkerReorgBefore(workerNum, rangesNum)
}

func (s *testDDLSuite) TestCallback(c *C) {
cb := &BaseCallback{}
c.Assert(cb.OnChanged(nil), IsNil)
Expand Down
62 changes: 59 additions & 3 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -656,16 +657,64 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

is := s.dom.InfoSchema()
schemaName := model.NewCIStr(s.schemaName)
tableName := model.NewCIStr("test_add_index")
tbl, err := is.TableByName(schemaName, tableName)
c.Assert(err, IsNil)

splitCount := 100
if !testPartition {
// Split table to multi region.
s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount)
}

// set hook for check add index worker num
hook := &ddl.TestDDLCallback{}
oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter()
lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt)
defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt)
// firstCheck is use to check split table range is take effect.
firstCheck := !testPartition
var checkErr error
changeWorkerNumEnable := false
hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) {
if checkErr != nil {
return
}
// Check split table range is successful.
if firstCheck {
firstCheck = false
if rangesNum != splitCount {
checkErr = errors.Errorf("first check rangeNum, expect: %v, but got: %v", 100, rangesNum)
}
}
setNum := int(variable.GetDDLReorgWorkerCounter())
if rangesNum < setNum {
if workerNum != rangesNum {
checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, rangesNum, workerNum)
}
} else if workerNum != setNum {
checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum)
}
changeWorkerNumEnable = true
}
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{})
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

deletedKeys := make(map[int]struct{})

ticker := time.NewTicker(s.lease / 2)
defer ticker.Stop()
ticker2 := time.NewTicker(10 * time.Millisecond)
defer ticker2.Stop()

LOOP:
for {
select {
case err := <-done:
case err = <-done:
if err == nil {
break LOOP
}
Expand All @@ -688,8 +737,16 @@ LOOP:
s.mustExec(c, sql)
}
num += step
case <-ticker2.C:
if changeWorkerNumEnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will have data race.

lastSetWorkerCnt = rand.Intn(8) + 8
s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt))
c.Assert(checkErr, IsNil)
changeWorkerNumEnable = false
}
}
}
c.Assert(checkErr, IsNil)

// get exists keys
keys := make([]int, 0, num)
Expand Down Expand Up @@ -733,7 +790,7 @@ LOOP:
t := s.testGetTable(c, "test_add_index")
handles := make(map[int64]struct{})
startKey := t.RecordKey(math.MinInt64)
err := t.IterRecords(ctx, startKey, t.Cols(),
err = t.IterRecords(ctx, startKey, t.Cols(),
func(h int64, data []types.Datum, cols []*table.Column) (bool, error) {
handles[h] = struct{}{}
return true, nil
Expand Down Expand Up @@ -772,7 +829,6 @@ LOOP:
delete(handles, h)
}
c.Assert(handles, HasLen, 0)

s.tk.MustExec("drop table test_add_index")
}

Expand Down
60 changes: 41 additions & 19 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,18 +1037,56 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker

// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle),
// and send these tasks to add index workers, till we finish adding the indices.
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error {
func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model.IndexInfo, job *model.Job, reorgInfo *reorgInfo) error {
totalAddedCount := job.GetRowCount()

startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, 0, workerCnt)
defer func() {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
closeAddIndexWorkers(idxWorkers)
}()

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle)
if err != nil {
return errors.Trace(err)
}

// For dynamic adjust add index worker number.
workerCnt = variable.GetDDLReorgWorkerCounter()
// If only have 1 range, we can only start 1 worker.
if len(kvRanges) < int(workerCnt) {
workerCnt = int32(len(kvRanges))
}
// Enlarge the worker size.
for i := len(idxWorkers); i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker.priority = job.Priority
idxWorkers = append(idxWorkers, idxWorker)
go idxWorkers[i].run(reorgInfo.d)
}
// Shrink the worker size.
if len(idxWorkers) > int(workerCnt) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
workers := idxWorkers[workerCnt:]
idxWorkers = idxWorkers[:workerCnt]
closeAddIndexWorkers(workers)
}

reorgInfo.d.mu.Lock()
reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use gofail instead of this logic?

reorgInfo.d.mu.Unlock()

log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges)
remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1081,23 +1119,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo)
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo)
if err != nil {
return errors.Trace(err)
}

// variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt".
workerCnt := variable.GetDDLReorgWorkerCounter()
idxWorkers := make([]*addIndexWorker, workerCnt)
for i := 0; i < int(workerCnt); i++ {
sessCtx := newContext(reorgInfo.d.store)
idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorkers[i].priority = job.Priority
go idxWorkers[i].run(reorgInfo.d)
}
defer closeAddIndexWorkers(idxWorkers)
err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo)
err := w.buildIndexForReorgInfo(t, indexInfo, job, reorgInfo)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
return errors.Trace(err)
}

Expand Down