Skip to content

Commit

Permalink
ddl: use session begin timestamp to read record for adding index (#43639
Browse files Browse the repository at this point in the history
)

close #40074
  • Loading branch information
tangenta authored May 11, 2023
1 parent e6ec225 commit 2e8bc40
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ go_test(
"//config",
"//ddl/ingest",
"//ddl/internal/callback",
"//ddl/internal/session",
"//ddl/placement",
"//ddl/schematracker",
"//ddl/testutil",
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return nil, err
}
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.checkpointMgr), nil
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.sessPool, b.checkpointMgr), nil
}

func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
Expand Down
4 changes: 3 additions & 1 deletion ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"time"

"github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -51,7 +52,8 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK
}
taskCh := make(chan *reorgBackfillTask, 5)
resultCh := make(chan idxRecResult, 5)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, nil)
sessPool := session.NewSessionPool(nil, store)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, sessPool, nil)
pool.chunkSender = &resultChanForTest{ch: resultCh}
pool.adjustSize(1)
pool.tasksCh <- task
Expand Down
55 changes: 42 additions & 13 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/ingest"
sess "github.com/pingcap/tidb/ddl/internal/session"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -70,6 +71,7 @@ type copReqSenderPool struct {
tasksCh chan *reorgBackfillTask
chunkSender chunkSender
checkpointMgr *ingest.CheckpointManager
sessPool *sess.Pool

ctx context.Context
copCtx *copContext
Expand All @@ -92,10 +94,17 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var curTaskID int
defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() {
p.chunkSender.AddTask(idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic})
p.chunkSender.AddTask(idxRecResult{err: dbterror.ErrReorgPanic})
}, false)
sessCtx, err := p.sessPool.Get()
if err != nil {
logutil.BgLogger().Error("[ddl-ingest] copReqSender get session from pool failed", zap.Error(err))
p.chunkSender.AddTask(idxRecResult{err: err})
return
}
se := sess.NewSession(sessCtx)
defer p.sessPool.Put(sessCtx)
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -110,18 +119,22 @@ func (c *copReqSender) run() {
zap.String("task end key", hex.EncodeToString(task.endKey)))
continue
}
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
ver, err := p.store.CurrentVersion(kv.GlobalTxnScope)
err := scanRecords(p, task, se)
if err != nil {
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
}
}

func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session) error {
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))

return wrapInBeginRollback(se, func(startTS uint64) error {
rs, err := p.copCtx.buildTableScan(p.ctx, startTS, task.startKey, task.excludedEndKey())
if err != nil {
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
return
return err
}
failpoint.Inject("mockCopSenderPanic", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -136,10 +149,9 @@ func (c *copReqSender) run() {
srcChk := p.getChunk()
done, err = p.copCtx.fetchTableScanResult(p.ctx, rs, srcChk)
if err != nil {
p.chunkSender.AddTask(idxRecResult{id: task.id, err: err})
p.recycleChunk(srcChk)
terror.Call(rs.Close)
return
return err
}
if p.checkpointMgr != nil {
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
Expand All @@ -151,11 +163,27 @@ func (c *copReqSender) run() {
p.chunkSender.AddTask(idxRs)
}
terror.Call(rs.Close)
}
return nil
})
}

func wrapInBeginRollback(se *sess.Session, f func(startTS uint64) error) error {
err := se.Begin()
if err != nil {
return errors.Trace(err)
}
defer se.Rollback()
var startTS uint64
sessVars := se.GetSessionVars()
sessVars.TxnCtxMu.Lock()
startTS = sessVars.TxnCtx.StartTS
sessVars.TxnCtxMu.Unlock()
return f(startTS)
}

func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Storage,
taskCh chan *reorgBackfillTask, checkpointMgr *ingest.CheckpointManager) *copReqSenderPool {
taskCh chan *reorgBackfillTask, sessPool *sess.Pool,
checkpointMgr *ingest.CheckpointManager) *copReqSenderPool {
poolSize := copReadChunkPoolSize()
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
Expand All @@ -169,6 +197,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, store kv.Stora
senders: make([]*copReqSender, 0, variable.GetDDLReorgWorkerCounter()),
wg: sync.WaitGroup{},
srcChkPool: srcChkPool,
sessPool: sessPool,
checkpointMgr: checkpointMgr,
}
}
Expand Down

0 comments on commit 2e8bc40

Please sign in to comment.