-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: make read and write async during adding index #39249
Changes from 10 commits
26d3d1d
7c527c8
c9c9544
41ac395
4aaba13
c7a725a
87ad3b4
32160ab
c0a3c17
f83c4a9
5ea5435
83eef63
23eb34a
ea47941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,7 +168,7 @@ func (r *reorgBackfillTask) String() string { | |
physicalID := strconv.FormatInt(r.physicalTableID, 10) | ||
startKey := hex.EncodeToString(r.startKey) | ||
endKey := hex.EncodeToString(r.endKey) | ||
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey | ||
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey | ||
if r.endInclude { | ||
return rangeStr + "]" | ||
} | ||
|
@@ -273,7 +273,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, | |
rc.increaseRowCount(int64(taskCtx.addedCount)) | ||
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) | ||
|
||
if num := result.scanCount - lastLogCount; num >= 30000 { | ||
if num := result.scanCount - lastLogCount; num >= 90000 { | ||
lastLogCount = result.scanCount | ||
logutil.BgLogger().Info("[ddl] backfill worker back fill index", | ||
zap.Int("worker ID", w.id), | ||
|
@@ -439,6 +439,9 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount | |
batchTasks []*reorgBackfillTask) error { | ||
reorgInfo := scheduler.reorgInfo | ||
for _, task := range batchTasks { | ||
if scheduler.copReqSenderPool != nil { | ||
scheduler.copReqSenderPool.sendTask(task) | ||
} | ||
scheduler.taskCh <- task | ||
Comment on lines
+442
to
445
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the task be sent to two different channels? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so. This is because I want to keep the old backfill implementation and introduce as few changes as possible: Old backfill:
New backfill in this PR:
If we only use one channel, then it will be
The backfill worker cannot process the next task until the current task is complete. However, this is not the behavior we want in the cop-request sender pool. To gain better performance, the sender pool should be always filled with tasks. Besides, for one channel solution, it is not easy to set the concurrency of backfill worker and cop-request sender separately. |
||
} | ||
|
||
|
@@ -605,6 +608,8 @@ type backfillScheduler struct { | |
|
||
taskCh chan *reorgBackfillTask | ||
resultCh chan *backfillResult | ||
|
||
copReqSenderPool *copReqSenderPool // for add index in ingest way. | ||
} | ||
|
||
const backfillTaskChanSize = 1024 | ||
|
@@ -658,14 +663,19 @@ func (b *backfillScheduler) workerSize() int { | |
} | ||
|
||
func (b *backfillScheduler) adjustWorkerSize() error { | ||
b.initCopReqSenderPool() | ||
reorgInfo := b.reorgInfo | ||
job := reorgInfo.Job | ||
jc := b.jobCtx | ||
if err := loadDDLReorgVars(b.ctx, b.sessPool); err != nil { | ||
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) | ||
} | ||
workerCnt := int(variable.GetDDLReorgWorkerCounter()) | ||
workerCnt = mathutil.Min(workerCnt, b.maxSize) | ||
if b.copReqSenderPool != nil { | ||
workerCnt = mathutil.Min(workerCnt/2+1, b.maxSize) | ||
} else { | ||
workerCnt = mathutil.Min(workerCnt, b.maxSize) | ||
} | ||
// Increase the worker. | ||
for i := len(b.workers); i < workerCnt; i++ { | ||
sessCtx, err := b.newSessCtx() | ||
|
@@ -680,8 +690,12 @@ func (b *backfillScheduler) adjustWorkerSize() error { | |
case typeAddIndexWorker: | ||
idxWorker, err := newAddIndexWorker(sessCtx, i, b.tbl, b.decodeColMap, reorgInfo, jc, job) | ||
if err != nil { | ||
return b.handleCreateBackfillWorkerErr(err) | ||
if b.canSkipError(err) { | ||
continue | ||
} | ||
return err | ||
} | ||
idxWorker.copReqSenderPool = b.copReqSenderPool | ||
worker, runner = idxWorker, idxWorker.backfillWorker | ||
case typeAddIndexMergeTmpWorker: | ||
tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, b.tbl, reorgInfo, jc) | ||
|
@@ -708,20 +722,66 @@ func (b *backfillScheduler) adjustWorkerSize() error { | |
b.workers = b.workers[:workerCnt] | ||
closeBackfillWorkers(workers) | ||
} | ||
if b.copReqSenderPool != nil { | ||
b.copReqSenderPool.adjustSize(len(b.workers)) | ||
} | ||
return injectCheckBackfillWorkerNum(len(b.workers)) | ||
} | ||
|
||
func (b *backfillScheduler) handleCreateBackfillWorkerErr(err error) error { | ||
if len(b.workers) == 0 { | ||
return errors.Trace(err) | ||
func (b *backfillScheduler) initCopReqSenderPool() { | ||
if b.tp != typeAddIndexWorker || b.reorgInfo.Job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge || | ||
b.copReqSenderPool != nil || len(b.workers) > 0 { | ||
return | ||
} | ||
indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID) | ||
if indexInfo == nil { | ||
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", | ||
zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) | ||
return | ||
} | ||
sessCtx, err := b.newSessCtx() | ||
if err != nil { | ||
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) | ||
return | ||
} | ||
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) | ||
if copCtx == nil { | ||
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender") | ||
return | ||
} | ||
startTS, err := latestStartTS(sessCtx.GetStore()) | ||
if err != nil { | ||
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) | ||
return | ||
} | ||
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, startTS) | ||
} | ||
|
||
func latestStartTS(storage kv.Storage) (startTS uint64, err error) { | ||
tangenta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
txn, err := storage.Begin() | ||
if err != nil { | ||
_ = txn.Rollback() | ||
return 0, err | ||
} | ||
_ = txn.Rollback() | ||
return txn.StartTS(), nil | ||
} | ||
|
||
func (b *backfillScheduler) canSkipError(err error) bool { | ||
if len(b.workers) > 0 { | ||
// The error can be skipped because the rest workers can handle the tasks. | ||
Benjamin2037 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return true | ||
} | ||
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed", | ||
zap.Int("current worker count", len(b.workers)), | ||
zap.Int64("job ID", b.reorgInfo.ID), zap.Error(err)) | ||
return nil | ||
return false | ||
} | ||
|
||
func (b *backfillScheduler) Close() { | ||
if b.copReqSenderPool != nil { | ||
b.copReqSenderPool.close() | ||
} | ||
closeBackfillWorkers(b.workers) | ||
close(b.taskCh) | ||
close(b.resultCh) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For 90000 row print log, it will be how much of interval of time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally, throughput of adding index using ingest is about 200k rows per second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we set 200k rows print one log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe 200k is too large for the old implementation, the average throughput is 20k rows per second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can create a sliding window to static the throughput. so we can adapt the rows for machine performance.
Lucky, I have create a sliding windows at util/window at this pr