diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index b5b3a8c4af80c..388bf91f2e7f7 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -100,6 +100,7 @@ go_library( "//util/domainutil", "//util/filter", "//util/gcutil", + "//util/generic", "//util/hack", "//util/logutil", "//util/mathutil", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 55fd5216b289d..35abc16cd1a6f 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -168,7 +168,7 @@ func (r *reorgBackfillTask) String() string { physicalID := strconv.FormatInt(r.physicalTableID, 10) startKey := hex.EncodeToString(r.startKey) endKey := hex.EncodeToString(r.endKey) - rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey if r.endInclude { return rangeStr + "]" } @@ -273,7 +273,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, rc.increaseRowCount(int64(taskCtx.addedCount)) rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) - if num := result.scanCount - lastLogCount; num >= 30000 { + if num := result.scanCount - lastLogCount; num >= 90000 { lastLogCount = result.scanCount logutil.BgLogger().Info("[ddl] backfill worker back fill index", zap.Int("worker ID", w.id), @@ -439,6 +439,9 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount batchTasks []*reorgBackfillTask) error { reorgInfo := scheduler.reorgInfo for _, task := range batchTasks { + if scheduler.copReqSenderPool != nil { + scheduler.copReqSenderPool.sendTask(task) + } scheduler.taskCh <- task } @@ -605,6 +608,8 @@ type backfillScheduler struct { taskCh chan *reorgBackfillTask resultCh chan *backfillResult + + copReqSenderPool *copReqSenderPool // for add index in ingest way. } const backfillTaskChanSize = 1024 @@ -658,6 +663,7 @@ func (b *backfillScheduler) workerSize() int { } func (b *backfillScheduler) adjustWorkerSize() error { + b.initCopReqSenderPool() reorgInfo := b.reorgInfo job := reorgInfo.Job jc := b.jobCtx @@ -665,7 +671,11 @@ func (b *backfillScheduler) adjustWorkerSize() error { logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) } workerCnt := int(variable.GetDDLReorgWorkerCounter()) - workerCnt = mathutil.Min(workerCnt, b.maxSize) + if b.copReqSenderPool != nil { + workerCnt = mathutil.Min(workerCnt/2+1, b.maxSize) + } else { + workerCnt = mathutil.Min(workerCnt, b.maxSize) + } // Increase the worker. for i := len(b.workers); i < workerCnt; i++ { sessCtx, err := b.newSessCtx() @@ -680,8 +690,12 @@ func (b *backfillScheduler) adjustWorkerSize() error { case typeAddIndexWorker: idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job) if err != nil { - return b.handleCreateBackfillWorkerErr(err) + if b.canSkipError(err) { + continue + } + return err } + idxWorker.copReqSenderPool = b.copReqSenderPool worker, runner = idxWorker, idxWorker.backfillWorker case typeAddIndexMergeTmpWorker: tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc) @@ -708,20 +722,56 @@ func (b *backfillScheduler) adjustWorkerSize() error { b.workers = b.workers[:workerCnt] closeBackfillWorkers(workers) } + if b.copReqSenderPool != nil { + b.copReqSenderPool.adjustSize(len(b.workers)) + } return injectCheckBackfillWorkerNum(len(b.workers)) } -func (b *backfillScheduler) handleCreateBackfillWorkerErr(err error) error { - if len(b.workers) == 0 { - return errors.Trace(err) +func (b *backfillScheduler) initCopReqSenderPool() { + if b.tp != typeAddIndexWorker || b.reorgInfo.Job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge || + b.copReqSenderPool != nil || len(b.workers) > 0 { + return + } + indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID) + if indexInfo == nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", + zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) + return + } + sessCtx, err := b.newSessCtx() + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) + return + } + copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) + if copCtx == nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender") + return + } + ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope) + if err != nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) + return + } + b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver) +} + +func (b *backfillScheduler) canSkipError(err error) bool { + if len(b.workers) > 0 { + // The error can be skipped because the rest workers can handle the tasks. + return true } logutil.BgLogger().Warn("[ddl] create add index backfill worker failed", zap.Int("current worker count", len(b.workers)), zap.Int64("job ID", b.reorgInfo.ID), zap.Error(err)) - return nil + return false } func (b *backfillScheduler) Close() { + if b.copReqSenderPool != nil { + b.copReqSenderPool.close() + } closeBackfillWorkers(b.workers) close(b.taskCh) close(b.resultCh) diff --git a/ddl/export_test.go b/ddl/export_test.go index 071652a642d42..641d7ce72fc8c 100644 --- a/ddl/export_test.go +++ b/ddl/export_test.go @@ -15,7 +15,10 @@ package ddl import ( + "context" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" ) @@ -23,14 +26,25 @@ func SetBatchInsertDeleteRangeSize(i int) { batchInsertDeleteRangeSize = i } -var ( - FetchRowsFromCop4Test = fetchRowsFromCop - NewCopContext4Test = newCopContext -) +var NewCopContext4Test = newCopContext -type ( - IndexRecord4Test = *indexRecord -) +func FetchRowsFromCop4Test(copCtx *copContext, startKey, endKey kv.Key, startTS uint64, + batchSize int) ([]*indexRecord, bool, error) { + variable.SetDDLReorgBatchSize(int32(batchSize)) + task := &reorgBackfillTask{ + id: 1, + startKey: startKey, + endKey: endKey, + } + pool := newCopReqSenderPool(context.Background(), copCtx, startTS) + pool.adjustSize(1) + pool.tasksCh <- task + idxRec, _, done, err := pool.fetchRowColValsFromCop(*task) + pool.close() + return idxRec, done, err +} + +type IndexRecord4Test = *indexRecord func (i IndexRecord4Test) GetHandle() kv.Handle { return i.handle diff --git a/ddl/index.go b/ddl/index.go index 83406a5872d0b..c3c3ffd37c926 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -796,6 +796,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if !ok && job.SnapshotVer != 0 { // The owner is crashed or changed, we need to restart the backfill. job.SnapshotVer = 0 + job.RowCount = 0 return false, ver, nil } bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) @@ -1179,9 +1180,9 @@ type baseIndexWorker struct { type addIndexWorker struct { baseIndexWorker - index table.Index - copCtx *copContext - writerCtx *ingest.WriterContext + index table.Index + writerCtx *ingest.WriterContext + copReqSenderPool *copReqSenderPool // The following attributes are used to reduce memory allocation. idxKeyBufs [][]byte @@ -1201,7 +1202,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext - var copCtx *copContext if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { bc, ok := ingest.LitBackCtxMgr.Load(job.ID) if !ok { @@ -1215,7 +1215,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable if err != nil { return nil, err } - copCtx = newCopContext(t.Meta(), indexInfo, sessCtx) } return &addIndexWorker{ @@ -1230,7 +1229,6 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable jobContext: jc, }, index: index, - copCtx: copCtx, writerCtx: lwCtx, }, nil } @@ -1501,8 +1499,8 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC nextKey kv.Key taskDone bool ) - if w.copCtx != nil { - idxRecords, nextKey, taskDone, err = w.fetchRowColValsFromCop(txn, handleRange) + if w.copReqSenderPool != nil { + idxRecords, nextKey, taskDone, err = w.copReqSenderPool.fetchRowColValsFromCop(handleRange) } else { idxRecords, nextKey, taskDone, err = w.fetchRowColVals(txn, handleRange) } @@ -1567,6 +1565,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC taskCtx.addedCount++ } + if w.copReqSenderPool != nil { + w.copReqSenderPool.recycleIdxRecords(idxRecords) + } + return nil }) logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) diff --git a/ddl/index_cop.go b/ddl/index_cop.go index dfd261740e7c8..c1229ae1f7a1e 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -16,6 +16,8 @@ package ddl import ( "context" + "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/distsql" @@ -29,41 +31,189 @@ import ( "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/generic" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) // copReadBatchFactor is the factor of batch size of coprocessor read. // It multiplies the tidb_ddl_reorg_batch_size to avoid sending too many cop requests for the same handle range. const copReadBatchFactor = 10 -func (w *addIndexWorker) fetchRowColValsFromCop(txn kv.Transaction, handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { - w.idxRecords = w.idxRecords[:0] - start, end := handleRange.startKey, handleRange.excludedEndKey() - batchCnt := w.batchCnt * copReadBatchFactor - return fetchRowsFromCop(w.ctx, w.copCtx, start, end, txn.StartTS(), w.idxRecords, batchCnt) +func (c *copReqSenderPool) fetchRowColValsFromCop(handleRange reorgBackfillTask) ([]*indexRecord, kv.Key, bool, error) { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case rs, ok := <-c.resultsCh: + if !ok { + logutil.BgLogger().Info("[ddl-ingest] cop-response channel is closed", + zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) + return nil, handleRange.endKey, true, nil + } + if rs.err != nil { + return nil, handleRange.startKey, false, rs.err + } + if rs.done { + logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", + zap.Int("id", rs.id), zap.Int("total", rs.total)) + c.results.Store(rs.id, struct{}{}) + } + if _, found := c.results.Load(handleRange.id); found { + logutil.BgLogger().Info("[ddl-ingest] task is found in results", + zap.Int("id", handleRange.id), zap.String("task", handleRange.String())) + c.results.Delete(handleRange.id) + return rs.records, handleRange.endKey, true, nil + } + return rs.records, handleRange.startKey, false, nil + case <-ticker.C: + logutil.BgLogger().Info("[ddl-ingest] cop-request result channel is empty", + zap.Int("id", handleRange.id)) + if _, found := c.results.Load(handleRange.id); found { + c.results.Delete(handleRange.id) + return nil, handleRange.endKey, true, nil + } + } + } } -// fetchRowsFromCop sends a coprocessor request and fetches the first batchCnt rows. -func fetchRowsFromCop(ctx context.Context, copCtx *copContext, startKey, endKey kv.Key, startTS uint64, - buf []*indexRecord, batchCnt int) ([]*indexRecord, kv.Key, bool, error) { - srcResult, err := copCtx.buildTableScan(ctx, startTS, startKey, endKey) - if err != nil { - return nil, nil, false, errors.Trace(err) +type copReqSenderPool struct { + tasksCh chan *reorgBackfillTask + resultsCh chan idxRecResult + results generic.SyncMap[int, struct{}] + + ctx context.Context + copCtx *copContext + startTS uint64 + + senders []*copReqSender + wg sync.WaitGroup + + idxBufPool sync.Pool +} + +type copReqSender struct { + senderPool *copReqSenderPool + + ctx context.Context + cancel context.CancelFunc +} + +func (c *copReqSender) run() { + p := c.senderPool + defer p.wg.Done() + srcChk := renewChunk(nil, p.copCtx.fieldTps) + for { + if util.HasCancelled(c.ctx) { + return + } + task, ok := <-p.tasksCh + if !ok { + return + } + logutil.BgLogger().Info("[ddl-ingest] start a cop-request task", + zap.Int("id", task.id), zap.String("task", task.String())) + rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey()) + if err != nil { + p.resultsCh <- idxRecResult{id: task.id, err: err} + return + } + var done bool + var total int + for !done { + idxRec := p.idxBufPool.Get().([]*indexRecord) + idxRec = idxRec[:0] + srcChk = renewChunk(srcChk, p.copCtx.fieldTps) + idxRec, done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk, idxRec) + if err != nil { + p.resultsCh <- idxRecResult{id: task.id, err: err} + return + } + total += len(idxRec) + p.resultsCh <- idxRecResult{id: task.id, records: idxRec, done: done, total: total} + } } - var done bool - buf, done, err = copCtx.fetchTableScanResult(ctx, srcResult, buf, batchCnt) - nextKey := endKey - if !done { - lastHandle := buf[len(buf)-1].handle - prefix := tablecodec.GenTableRecordPrefix(copCtx.tblInfo.ID) - nextKey = tablecodec.EncodeRecordKey(prefix, lastHandle).Next() +} + +// renewChunk creates a new chunk when the reorg batch size is changed. +func renewChunk(oldChk *chunk.Chunk, fts []*types.FieldType) *chunk.Chunk { + newSize := variable.GetDDLReorgBatchSize() + newCap := int(newSize) * copReadBatchFactor + if oldChk == nil || oldChk.Capacity() != newCap { + return chunk.NewChunkWithCapacity(fts, newCap) } - return buf, nextKey, done, err + oldChk.Reset() + return oldChk } +func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64) *copReqSenderPool { + return &copReqSenderPool{ + tasksCh: make(chan *reorgBackfillTask, backfillTaskChanSize), + resultsCh: make(chan idxRecResult, backfillTaskChanSize), + results: generic.NewSyncMap[int, struct{}](10), + ctx: ctx, + copCtx: copCtx, + startTS: startTS, + senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()), + wg: sync.WaitGroup{}, + idxBufPool: sync.Pool{ + New: func() any { + return make([]*indexRecord, 0, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor) + }, + }, + } +} + +func (c *copReqSenderPool) sendTask(task *reorgBackfillTask) { + c.tasksCh <- task +} + +func (c *copReqSenderPool) adjustSize(n int) { + // Add some senders. + for i := len(c.senders); i < n; i++ { + ctx, cancel := context.WithCancel(c.ctx) + c.senders = append(c.senders, &copReqSender{ + senderPool: c, + ctx: ctx, + cancel: cancel, + }) + c.wg.Add(1) + go c.senders[i].run() + } + // Remove some senders. + if n < len(c.senders) { + for i := n; i < len(c.senders); i++ { + c.senders[i].cancel() + } + c.senders = c.senders[:n] + } +} + +func (c *copReqSenderPool) close() { + logutil.BgLogger().Info("[ddl-ingest] close cop-request sender pool", zap.Int("results not handled", len(c.results.Keys()))) + close(c.tasksCh) + for _, w := range c.senders { + w.cancel() + } + c.wg.Wait() + close(c.resultsCh) +} + +// recycleIdxRecords puts the index record slice back to the pool for reuse. +func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) { + if len(idxRecs) == 0 { + return + } + c.idxBufPool.Put(idxRecs[:0]) +} + +// copContext contains the information that is needed when building a coprocessor request. +// It is unchanged after initialization. type copContext struct { tblInfo *model.TableInfo idxInfo *model.IndexInfo @@ -71,8 +221,6 @@ type copContext struct { colInfos []*model.ColumnInfo fieldTps []*types.FieldType sessCtx sessionctx.Context - - srcChunk *chunk.Chunk } func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext { @@ -99,7 +247,6 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s colInfos: colInfos, fieldTps: fieldTps, sessCtx: sessCtx, - srcChunk: chunk.NewChunkWithCapacity(fieldTps, variable.DefMaxChunkSize), } return copCtx } @@ -127,30 +274,27 @@ func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, } func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.SelectResult, - buf []*indexRecord, batchCnt int) ([]*indexRecord, bool, error) { + chk *chunk.Chunk, buf []*indexRecord) ([]*indexRecord, bool, error) { sctx := c.sessCtx.GetSessionVars().StmtCtx - for { - err := result.Next(ctx, c.srcChunk) + err := result.Next(ctx, chk) + if err != nil { + return nil, false, errors.Trace(err) + } + if chk.NumRows() == 0 { + return buf, true, nil + } + iter := chunk.NewIterator4Chunk(chk) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) + handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) if err != nil { return nil, false, errors.Trace(err) } - if c.srcChunk.NumRows() == 0 { - return buf, true, nil - } - iter := chunk.NewIterator4Chunk(c.srcChunk) - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps) - handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) - if err != nil { - return nil, false, errors.Trace(err) - } - rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) - buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) - if len(buf) >= batchCnt { - return buf, false, nil - } - } + rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo) + buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) } + done := chk.NumRows() < chk.Capacity() + return buf, done, nil } func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) { @@ -219,3 +363,11 @@ func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo, } return kv.IntHandle(pkDts[0].GetInt64()), nil } + +type idxRecResult struct { + id int + records []*indexRecord + err error + done bool + total int +} diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 7f18d415ef8f2..333afa997d3bc 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -15,7 +15,6 @@ package ddl_test import ( - "context" "fmt" "strconv" "testing" @@ -43,8 +42,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { endKey := startKey.PrefixNext() txn, err := store.Begin() require.NoError(t, err) - idxRec, _, done, err := ddl.FetchRowsFromCop4Test(context.Background(), copCtx, startKey, endKey, - txn.StartTS(), nil, 10) + idxRec, done, err := ddl.FetchRowsFromCop4Test(copCtx, startKey, endKey, txn.StartTS(), 10) require.NoError(t, err) require.True(t, done) require.NoError(t, txn.Rollback()) diff --git a/tests/realtikvtest/addindextest/BUILD.bazel b/tests/realtikvtest/addindextest/BUILD.bazel index 1a9951fd86a51..f6bb2f0844404 100644 --- a/tests/realtikvtest/addindextest/BUILD.bazel +++ b/tests/realtikvtest/addindextest/BUILD.bazel @@ -34,9 +34,12 @@ go_test( embed = [":addindextest"], deps = [ "//config", + "//ddl", "//ddl/ingest", + "//ddl/testutil", "//testkit", "//tests/realtikvtest", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", ], ) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index c5d9fd2a20ce9..7427f935c78ca 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -18,9 +18,13 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "testing" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/tests/realtikvtest" "github.com/stretchr/testify/require" @@ -123,6 +127,59 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } +func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int primary key);") + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i < 20; i++ { + sb.WriteString(fmt.Sprintf("(%d000)", i)) + if i != 19 { + sb.WriteString(",") + } + } + tk.MustExec(sb.String()) + tk.MustQuery("split table t between (0) and (20000) regions 20;").Check(testkit.Rows("19 1")) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/checkBackfillWorkerNum", `return(true)`)) + done := make(chan error, 1) + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, 1) + testutil.SessionExecInGoroutine(store, "addindexlit", "alter table t add index idx(a);", done) + checkNum := 0 + + running := true + cnt := [3]int{1, 2, 4} + offset := 0 + for running { + select { + case err := <-done: + require.NoError(t, err) + running = false + case wg := <-ddl.TestCheckWorkerNumCh: + offset = (offset + 1) % 3 + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_reorg_worker_cnt=%d", cnt[offset])) + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, int32(cnt[offset])/2+1) + checkNum++ + wg.Done() + } + } + + require.Greater(t, checkNum, 5) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/checkBackfillWorkerNum")) + tk.MustExec("admin check table t;") + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + jobTp := rows[0][3].(string) + require.True(t, strings.Contains(jobTp, "ingest"), jobTp) + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") +} + func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) @@ -147,4 +204,5 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) + tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;") }