Skip to content

Commit

Permalink
ddl, parser: Implement the write-reorg state split task related funct…
Browse files Browse the repository at this point in the history
…ions, and the related interfaces of backfill worker (#39982)

close #37123
  • Loading branch information
zimulala authored Jan 4, 2023
1 parent e7e7935 commit cb5affb
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 250 deletions.
541 changes: 475 additions & 66 deletions ddl/backfilling.go

Large diffs are not rendered by default.

58 changes: 38 additions & 20 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() {
if w.getReorgCtx(reorgInfo.Job.ID).isReorgCanceled() {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
Expand All @@ -1081,7 +1081,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
return errors.Trace(err)
}
//nolint:forcetypeassert
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job.ID), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1104,11 +1104,11 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
// Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle].
if i == startElementOffsetToResetHandle+1 {
reorgInfo.StartKey, reorgInfo.EndKey = originalStartHandle, originalEndHandle
w.getReorgCtx(reorgInfo.Job).setNextKey(reorgInfo.StartKey)
w.getReorgCtx(reorgInfo.Job.ID).setNextKey(reorgInfo.StartKey)
}

// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1])
w.getReorgCtx(reorgInfo.Job.ID).setCurrentElement(reorgInfo.elements[i+1])

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
Expand All @@ -1132,7 +1132,7 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error
}

type updateColumnWorker struct {
*backfillWorker
*backfillCtx
oldColInfo *model.ColumnInfo
newColInfo *model.ColumnInfo
metricCounter prometheus.Counter
Expand All @@ -1144,11 +1144,10 @@ type updateColumnWorker struct {
rowMap map[int64]types.Datum

// For SQL Mode and warnings.
sqlMode mysql.SQLMode
jobContext *JobContext
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
func newUpdateColumnWorker(sessCtx sessionctx.Context, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
Expand All @@ -1164,21 +1163,40 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeUpdateColumnWorker),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
sqlMode: reorgInfo.ReorgMeta.SQLMode,
jobContext: jc,
backfillCtx: newBackfillCtx(reorgInfo.d, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
jobContext: jc,
}
}

func (w *updateColumnWorker) AddMetricInfo(cnt float64) {
w.metricCounter.Add(cnt)
}

func (*updateColumnWorker) String() string {
return typeUpdateColumnWorker.String()
}

func (*updateColumnWorker) GetTask() (*BackfillJob, error) {
panic("[ddl] update column worker GetTask function doesn't implement")
}

func (*updateColumnWorker) UpdateTask(*BackfillJob) error {
panic("[ddl] update column worker UpdateTask function doesn't implement")
}

func (*updateColumnWorker) FinishTask(*BackfillJob) error {
panic("[ddl] update column worker FinishTask function doesn't implement")
}

func (w *updateColumnWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

type rowRecord struct {
key []byte // It's used to lock a record. Record it to reduce the encoding time.
vals []byte // It's the record.
Expand All @@ -1204,8 +1222,8 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, w.table.RecordPrefix(), txn.StartTS(), taskRange.startKey, taskRange.endKey,
func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(),
txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0)
oprStartTime = oprEndTime
Expand Down Expand Up @@ -1346,8 +1364,8 @@ func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil {
txn.SetOption(kv.Priority, handleRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(handleRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}

Expand Down
22 changes: 11 additions & 11 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,15 @@ func (dc *ddlCtx) isOwner() bool {
return isOwner
}

func (dc *ddlCtx) setDDLLabelForTopSQL(job *model.Job) {
func (dc *ddlCtx) setDDLLabelForTopSQL(jobID int64, jobQuery string) {
dc.jobCtx.Lock()
defer dc.jobCtx.Unlock()
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
ctx, exists := dc.jobCtx.jobCtxMap[jobID]
if !exists {
ctx = NewJobContext()
dc.jobCtx.jobCtxMap[job.ID] = ctx
dc.jobCtx.jobCtxMap[jobID] = ctx
}
ctx.setDDLLabelForTopSQL(job)
ctx.setDDLLabelForTopSQL(jobQuery)
}

func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
Expand All @@ -439,10 +439,10 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
ctx.setDDLLabelForDiagnosis(job)
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger {
func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(jobID int64) tikvrpc.ResourceGroupTagger {
dc.jobCtx.Lock()
defer dc.jobCtx.Unlock()
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
ctx, exists := dc.jobCtx.jobCtxMap[jobID]
if !exists {
return nil
}
Expand All @@ -455,19 +455,19 @@ func (dc *ddlCtx) removeJobCtx(job *model.Job) {
delete(dc.jobCtx.jobCtxMap, job.ID)
}

func (dc *ddlCtx) jobContext(job *model.Job) *JobContext {
func (dc *ddlCtx) jobContext(jobID int64) *JobContext {
dc.jobCtx.RLock()
defer dc.jobCtx.RUnlock()
if jobContext, exists := dc.jobCtx.jobCtxMap[job.ID]; exists {
if jobContext, exists := dc.jobCtx.jobCtxMap[jobID]; exists {
return jobContext
}
return NewJobContext()
}

func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {
func (dc *ddlCtx) getReorgCtx(jobID int64) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
return dc.reorgCtx.reorgCtxMap[job.ID]
return dc.reorgCtx.reorgCtxMap[jobID]
}

func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx {
Expand All @@ -492,7 +492,7 @@ func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
}

func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
rc := dc.getReorgCtx(job)
rc := dc.getReorgCtx(job.ID)
if rc == nil {
return
}
Expand Down
16 changes: 8 additions & 8 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,14 +641,14 @@ func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) {
return true, nil
}

func (w *JobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) {
if !topsqlstate.TopSQLEnabled() || jobQuery == "" {
return
}

if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
if jobQuery != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(jobQuery)
w.cacheSQL = jobQuery
w.ddlJobCtx = topsql.AttachAndRegisterSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, false)
} else {
topsql.AttachAndRegisterSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, false)
Expand Down Expand Up @@ -735,10 +735,10 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
if w.tp == addIdxWorker && job.IsRunning() {
txn.SetDiskFullOpt(kvrpcpb.DiskFullOpt_NotAllowedOnFull)
}
w.setDDLLabelForTopSQL(job)
w.setDDLLabelForTopSQL(job.ID, job.Query)
w.setDDLSourceForDiagnosis(job)
jobContext := w.jobContext(job)
if tagger := w.getResourceGroupTaggerForTopSQL(job); tagger != nil {
jobContext := w.jobContext(job.ID)
if tagger := w.getResourceGroupTaggerForTopSQL(job.ID); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
t := meta.NewMeta(txn)
Expand Down
4 changes: 1 addition & 3 deletions ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"testing"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

Expand All @@ -36,10 +35,9 @@ func TestDDLWorkerPool(t *testing.T) {
}

func TestBackfillWorkerPool(t *testing.T) {
reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}}
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newBackfillWorker(context.Background(), nil, 1, nil, reorgInfo, typeAddIndexWorker)
wk := newBackfillWorker(context.Background(), 1, nil)
return wk, nil
}
}
Expand Down
Loading

0 comments on commit cb5affb

Please sign in to comment.