diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 192840f35f5a5..30d071e8914a7 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -902,6 +902,323 @@ func injectCheckBackfillWorkerNum(curWorkerSize int, isMergeWorker bool) error { return nil } +<<<<<<< HEAD +======= +func addBatchBackfillJobs(sess *session, bfWorkerType backfillerType, reorgInfo *reorgInfo, notDistTask bool, + batchTasks []*reorgBackfillTask, bJobs []*BackfillJob, isUnique bool, id *int64) error { + bJobs = bJobs[:0] + instanceID := "" + if notDistTask { + instanceID = reorgInfo.d.uuid + } + // TODO: Adjust the number of ranges(region) for each task. + for _, task := range batchTasks { + bm := &model.BackfillMeta{ + PhysicalTableID: reorgInfo.PhysicalTableID, + IsUnique: isUnique, + EndInclude: task.endInclude, + ReorgTp: reorgInfo.Job.ReorgMeta.ReorgTp, + SQLMode: reorgInfo.ReorgMeta.SQLMode, + Location: reorgInfo.ReorgMeta.Location, + JobMeta: &model.JobMeta{ + SchemaID: reorgInfo.Job.SchemaID, + TableID: reorgInfo.Job.TableID, + Query: reorgInfo.Job.Query, + }, + } + bj := &BackfillJob{ + ID: *id, + JobID: reorgInfo.Job.ID, + EleID: reorgInfo.currElement.ID, + EleKey: reorgInfo.currElement.TypeKey, + Tp: bfWorkerType, + State: model.JobStateNone, + InstanceID: instanceID, + CurrKey: task.startKey, + StartKey: task.startKey, + EndKey: task.endKey, + Meta: bm, + } + *id++ + bJobs = append(bJobs, bj) + } + if err := AddBackfillJobs(sess, bJobs); err != nil { + return errors.Trace(err) + } + return nil +} + +func (*ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo, pTbl table.PhysicalTable, isUnique bool, + bfWorkerType backfillerType, startKey kv.Key, currBackfillJobID int64) error { + endKey := reorgInfo.EndKey + isFirstOps := true + bJobs := make([]*BackfillJob, 0, genTaskBatch) + for { + kvRanges, err := splitTableRanges(pTbl, reorgInfo.d.store, startKey, endKey) + if err != nil { + return errors.Trace(err) + } + batchTasks := getBatchTasks(pTbl, reorgInfo, kvRanges, genTaskBatch) + if len(batchTasks) == 0 { + break + } + notNeedDistProcess := isFirstOps && (len(kvRanges) < minDistTaskCnt) + if err = addBatchBackfillJobs(sess, bfWorkerType, reorgInfo, notNeedDistProcess, batchTasks, bJobs, isUnique, &currBackfillJobID); err != nil { + return errors.Trace(err) + } + isFirstOps = false + + remains := kvRanges[len(batchTasks):] + // TODO: After adding backfillCh do asyncNotify(dc.backfillJobCh). + logutil.BgLogger().Info("[ddl] split backfill jobs to the backfill table", + zap.Int("batchTasksCnt", len(batchTasks)), + zap.Int("totalRegionCnt", len(kvRanges)), + zap.Int("remainRegionCnt", len(remains)), + zap.String("startHandle", hex.EncodeToString(startKey)), + zap.String("endHandle", hex.EncodeToString(endKey))) + + if len(remains) == 0 { + break + } + + for { + bJobCnt, err := checkBackfillJobCount(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if bJobCnt < minGenTaskBatch { + break + } + time.Sleep(retrySQLInterval) + } + startKey = remains[0].StartKey + } + return nil +} + +func (dc *ddlCtx) controlWritePhysicalTableRecord(sess *session, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error { + startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey + if startKey == nil && endKey == nil { + return nil + } + + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + currBackfillJobID := int64(1) + err := checkAndHandleInterruptedBackfillJobs(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + maxBfJob, err := GetMaxBackfillJob(sess, reorgInfo.Job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + return errors.Trace(err) + } + if maxBfJob != nil { + startKey = maxBfJob.EndKey + currBackfillJobID = maxBfJob.ID + 1 + } + + var isUnique bool + if bfWorkerType == typeAddIndexWorker { + idxInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID) + isUnique = idxInfo.Unique + } + err = dc.splitTableToBackfillJobs(sess, reorgInfo, t, isUnique, bfWorkerType, startKey, currBackfillJobID) + if err != nil { + return errors.Trace(err) + } + + var backfillJobFinished bool + jobID := reorgInfo.Job.ID + ticker := time.NewTicker(300 * time.Millisecond) + defer ticker.Stop() + for { + if err := dc.isReorgRunnable(reorgInfo.Job.ID); err != nil { + return errors.Trace(err) + } + + select { + case <-ticker.C: + if !backfillJobFinished { + err := checkAndHandleInterruptedBackfillJobs(sess, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + if err != nil { + logutil.BgLogger().Warn("[ddl] finish interrupted backfill jobs", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey, false) + if err != nil { + logutil.BgLogger().Info("[ddl] getBackfillJobWithRetry failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if bfJob == nil { + backfillJobFinished = true + logutil.BgLogger().Info("[ddl] finish backfill jobs", zap.Int64("job ID", jobID)) + } + } + if backfillJobFinished { + // TODO: Consider whether these backfill jobs are always out of sync. + isSynced, err := checkJobIsSynced(sess, jobID) + if err != nil { + logutil.BgLogger().Warn("[ddl] checkJobIsSynced failed", zap.Int64("job ID", jobID), zap.Error(err)) + return errors.Trace(err) + } + if isSynced { + logutil.BgLogger().Info("[ddl] sync backfill jobs", zap.Int64("job ID", jobID)) + return nil + } + } + case <-dc.ctx.Done(): + return dc.ctx.Err() + } + } +} + +func checkJobIsSynced(sess *session, jobID int64) (bool, error) { + var err error + var unsyncedInstanceIDs []string + for i := 0; i < retrySQLTimes; i++ { + unsyncedInstanceIDs, err = getUnsyncedInstanceIDs(sess, jobID, "check_backfill_history_job_sync") + if err == nil && len(unsyncedInstanceIDs) == 0 { + return true, nil + } + + logutil.BgLogger().Info("[ddl] checkJobIsSynced failed", + zap.Strings("unsyncedInstanceIDs", unsyncedInstanceIDs), zap.Int("tryTimes", i), zap.Error(err)) + time.Sleep(retrySQLInterval) + } + + return false, errors.Trace(err) +} + +func checkAndHandleInterruptedBackfillJobs(sess *session, jobID, currEleID int64, currEleKey []byte) (err error) { + var bJobs []*BackfillJob + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetInterruptedBackfillJobsForOneEle(sess, jobID, currEleID, currEleKey) + if err == nil { + break + } + logutil.BgLogger().Info("[ddl] getInterruptedBackfillJobsForOneEle failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + for i := 0; i < retrySQLTimes; i++ { + err = MoveBackfillJobsToHistoryTable(sess, bJobs[0]) + if err == nil { + return errors.Errorf(bJobs[0].Meta.ErrMsg) + } + logutil.BgLogger().Info("[ddl] MoveBackfillJobsToHistoryTable failed", zap.Error(err)) + time.Sleep(retrySQLInterval) + } + return errors.Trace(err) +} + +func checkBackfillJobCount(sess *session, jobID, currEleID int64, currEleKey []byte) (backfillJobCnt int, err error) { + err = checkAndHandleInterruptedBackfillJobs(sess, jobID, currEleID, currEleKey) + if err != nil { + return 0, errors.Trace(err) + } + + backfillJobCnt, err = GetBackfillJobCount(sess, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + jobID, currEleID, currEleKey), "check_backfill_job_count") + if err != nil { + return 0, errors.Trace(err) + } + + return backfillJobCnt, nil +} + +func getBackfillJobWithRetry(sess *session, tableName string, jobID, currEleID int64, currEleKey []byte, isDesc bool) (*BackfillJob, error) { + var err error + var bJobs []*BackfillJob + descStr := "" + if isDesc { + descStr = "order by id desc" + } + for i := 0; i < retrySQLTimes; i++ { + bJobs, err = GetBackfillJobs(sess, tableName, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' %s limit 1", + jobID, currEleID, currEleKey, descStr), "check_backfill_job_state") + if err != nil { + logutil.BgLogger().Warn("[ddl] GetBackfillJobs failed", zap.Error(err)) + continue + } + + if len(bJobs) != 0 { + return bJobs[0], nil + } + break + } + return nil, errors.Trace(err) +} + +// GetMaxBackfillJob gets the max backfill job in BackfillTable and BackfillHistoryTable. +func GetMaxBackfillJob(sess *session, jobID, currEleID int64, currEleKey []byte) (*BackfillJob, error) { + bfJob, err := getBackfillJobWithRetry(sess, BackfillTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + hJob, err := getBackfillJobWithRetry(sess, BackfillHistoryTable, jobID, currEleID, currEleKey, true) + if err != nil { + return nil, errors.Trace(err) + } + + if bfJob == nil { + return hJob, nil + } + if hJob == nil { + return bfJob, nil + } + if bfJob.ID > hJob.ID { + return bfJob, nil + } + return hJob, nil +} + +// MoveBackfillJobsToHistoryTable moves backfill table jobs to the backfill history table. +func MoveBackfillJobsToHistoryTable(sctx sessionctx.Context, bfJob *BackfillJob) error { + s, ok := sctx.(*session) + if !ok { + return errors.Errorf("sess ctx:%#v convert session failed", sctx) + } + + return s.runInTxn(func(se *session) error { + // TODO: Consider batch by batch update backfill jobs and insert backfill history jobs. + bJobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s'", + bfJob.JobID, bfJob.EleID, bfJob.EleKey), "update_backfill_job") + if err != nil { + return errors.Trace(err) + } + if len(bJobs) == 0 { + return nil + } + + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + startTS := txn.StartTS() + err = RemoveBackfillJob(se, true, bJobs[0]) + if err == nil { + for _, bj := range bJobs { + bj.State = model.JobStateCancelled + bj.FinishTS = startTS + } + err = AddBackfillHistoryJob(se, bJobs) + } + logutil.BgLogger().Info("[ddl] move backfill jobs to history table", zap.Int("job count", len(bJobs))) + return errors.Trace(err) + }) +} + +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error) diff --git a/ddl/column.go b/ddl/column.go index 5be16eb62eafd..bd1087258f8d7 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -811,6 +811,7 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn +<<<<<<< HEAD var rh *reorgHandler if w.concurrentDDL { sctx, err1 := w.sessPool.get() @@ -834,6 +835,18 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J rh = newReorgHandler(newMeta, newSession(sctx), w.concurrentDDL) } else { rh = newReorgHandler(t, w.sess, w.concurrentDDL) +======= + sctx, err1 := w.sessPool.get() + if err1 != nil { + err = errors.Trace(err1) + return + } + defer w.sessPool.put(sctx) + rh := newReorgHandler(newSession(sctx)) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) } reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo.first { diff --git a/ddl/column_type_change_test.go b/ddl/column_type_change_test.go index 486c6dc520ba8..aa0ecf38e8f26 100644 --- a/ddl/column_type_change_test.go +++ b/ddl/column_type_change_test.go @@ -2427,6 +2427,7 @@ func TestColumnTypeChangeTimestampToInt(t *testing.T) { tk.MustExec("admin check table t") } +<<<<<<< HEAD func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) { store := testkit.CreateMockStore(t) tk0 := testkit.NewTestKit(t, store) @@ -2448,6 +2449,8 @@ func TestFixDDLTxnWillConflictWithReorgTxnNotConcurrent(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1690 2 warnings with this error code, first warning: constant 128 overflows tinyint")) } +======= +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -2455,7 +2458,10 @@ func TestFixDDLTxnWillConflictWithReorgTxn(t *testing.T) { tk.MustExec("create table t (a int)") tk.MustExec("set global tidb_ddl_enable_fast_reorg = OFF") +<<<<<<< HEAD defer tk.MustExec("set global tidb_ddl_enable_fast_reorg = default") +======= +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) tk.MustExec("alter table t add index(a)") tk.MustExec("set @@sql_mode=''") tk.MustExec("insert into t values(128),(129)") diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 509ff005707bd..58643dea532df 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4556,6 +4556,7 @@ func TestAlterModifyPartitionColTruncateWarning(t *testing.T) { "Warning 1265 Data truncated for column 'a', value is ' 654321'")) } +<<<<<<< HEAD func TestIssue40135Ver2(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -4593,6 +4594,8 @@ func TestIssue40135Ver2(t *testing.T) { tk.MustExec("admin check table t40135") } +======= +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/db_test.go b/ddl/db_test.go index d36de3a425025..f1eaacfa6630f 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -620,10 +620,7 @@ func TestAddExpressionIndexRollback(t *testing.T) { // Check whether the reorg information is cleaned up. err := sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) - txn, err := ctx.Txn(true) - require.NoError(t, err) - m := meta.NewMeta(txn) - element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) + element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) require.Nil(t, element) require.Nil(t, start) diff --git a/ddl/ddl.go b/ddl/ddl.go index 136c7abf2bf0f..df544d7d0ed8a 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -1446,7 +1446,11 @@ func GetDDLInfo(s sessionctx.Context) (*Info, error) { return info, nil } +<<<<<<< HEAD _, info.ReorgHandle, _, _, err = newReorgHandler(t, sess, enable).GetDDLReorgHandle(reorgJob) +======= + _, info.ReorgHandle, _, _, err = newReorgHandler(sess).GetDDLReorgHandle(reorgJob) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) if err != nil { if meta.ErrDDLReorgElementNotExist.Equal(err) { return info, nil diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index d86b3315ff217..6e71a417ee730 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -775,7 +775,11 @@ func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error { if err != nil { return err } +<<<<<<< HEAD CleanupDDLReorgHandles(job, w, t) +======= + CleanupDDLReorgHandles(job, w.sess) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) asyncNotify(d.ddlJobDoneCh) return nil } diff --git a/ddl/index.go b/ddl/index.go index ec09a3c802379..ad346d9ac6e97 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -899,6 +899,7 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} +<<<<<<< HEAD var rh *reorgHandler if w.concurrentDDL { sctx, err1 := w.sessPool.get() @@ -922,6 +923,18 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, rh = newReorgHandler(newMeta, sess, w.concurrentDDL) } else { rh = newReorgHandler(t, w.sess, w.concurrentDDL) +======= + sctx, err1 := w.sessPool.get() + if err1 != nil { + err = err1 + return + } + defer w.sessPool.put(sctx) + rh := newReorgHandler(newSession(sctx)) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) } failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { @@ -1311,6 +1324,62 @@ func (w *baseIndexWorker) AddMetricInfo(cnt float64) { w.metricCounter.Add(cnt) } +<<<<<<< HEAD +======= +func (*baseIndexWorker) GetTask() (*BackfillJob, error) { + return nil, nil +} + +func (w *baseIndexWorker) String() string { + return w.tp.String() +} + +func (w *baseIndexWorker) UpdateTask(bfJob *BackfillJob) error { + s := newSession(w.backfillCtx.sessCtx) + + return s.runInTxn(func(se *session) error { + jobs, err := GetBackfillJobs(se, BackfillTable, fmt.Sprintf("ddl_job_id = %d and ele_id = %d and ele_key = '%s' and id = %d", + bfJob.JobID, bfJob.EleID, bfJob.EleKey, bfJob.ID), "update_backfill_task") + if err != nil { + return err + } + if len(jobs) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") + } + if jobs[0].InstanceID != bfJob.InstanceID { + return dbterror.ErrDDLJobNotFound.FastGenByArgs(fmt.Sprintf("get a backfill job %v, want instance ID %s", jobs[0], bfJob.InstanceID)) + } + + currTime, err := GetOracleTimeWithStartTS(se) + if err != nil { + return err + } + bfJob.InstanceLease = GetLeaseGoTime(currTime, InstanceLease) + return updateBackfillJob(se, BackfillTable, bfJob, "update_backfill_task") + }) +} + +func (w *baseIndexWorker) FinishTask(bfJob *BackfillJob) error { + s := newSession(w.backfillCtx.sessCtx) + return s.runInTxn(func(se *session) error { + txn, err := se.txn() + if err != nil { + return errors.Trace(err) + } + bfJob.FinishTS = txn.StartTS() + err = RemoveBackfillJob(se, false, bfJob) + if err != nil { + return err + } + return AddBackfillHistoryJob(se, []*BackfillJob{bfJob}) + }) +} + +func (w *baseIndexWorker) GetCtx() *backfillCtx { + return w.backfillCtx +} + +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) // mockNotOwnerErrOnce uses to make sure `notOwnerErr` only mock error once. var mockNotOwnerErrOnce uint32 diff --git a/ddl/job_table.go b/ddl/job_table.go index 34e2215cbf5f1..d10ff1bfae0b3 100644 --- a/ddl/job_table.go +++ b/ddl/job_table.go @@ -515,9 +515,21 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { if err != nil { return err } +<<<<<<< HEAD defer d.sessPool.put(sctx) sess := newSession(sctx) return sess.runInTxn(func(se *session) error { +======= + _, err = sess.execute(context.Background(), sql, label) + return errors.Trace(err) +} + +// AddBackfillJobs adds the backfill jobs to the tidb_ddl_backfill table. +func AddBackfillJobs(s *session, backfillJobs []*BackfillJob) error { + label := fmt.Sprintf("add_%s_job", BackfillTable) + // Do runInTxn to get StartTS. + return s.runInTxn(func(se *session) error { +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) txn, err := se.txn() if err != nil { return errors.Trace(err) @@ -531,6 +543,7 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { if err != nil { return errors.Trace(err) } +<<<<<<< HEAD for _, tp := range []workerType{addIdxWorker, generalWorker} { t := newMetaWithQueueTp(txn, tp) jobs, err := t.GetAllDDLJobsInQueue() @@ -561,6 +574,91 @@ func (d *ddl) MoveJobFromQueue2Table(inBootstrap bool) error { if err != nil { return errors.Trace(err) } +======= + _, err = se.execute(context.Background(), sql, label) + return errors.Trace(err) + }) +} + +// GetBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element. +func GetBackfillJobsForOneEle(s *session, batch int, excludedJobIDs []int64, lease time.Duration) ([]*BackfillJob, error) { + eJobIDsBuilder := strings.Builder{} + for i, id := range excludedJobIDs { + if i == 0 { + eJobIDsBuilder.WriteString(" and ddl_job_id not in (") + } + eJobIDsBuilder.WriteString(strconv.Itoa(int(id))) + if i == len(excludedJobIDs)-1 { + eJobIDsBuilder.WriteString(")") + } else { + eJobIDsBuilder.WriteString(", ") + } + } + + var err error + var bJobs []*BackfillJob + err = s.runInTxn(func(se *session) error { + currTime, err := GetOracleTimeWithStartTS(se) + if err != nil { + return err + } + + bJobs, err = GetBackfillJobs(se, BackfillTable, + fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') %s order by ddl_job_id, ele_key, ele_id limit %d", + currTime.Add(-lease), eJobIDsBuilder.String(), batch), "get_backfill_job") + return err + }) + if err != nil || len(bJobs) == 0 { + return nil, err + } + + validLen := 1 + firstJobID, firstEleID, firstEleKey := bJobs[0].JobID, bJobs[0].EleID, bJobs[0].EleKey + for i := 1; i < len(bJobs); i++ { + if bJobs[i].JobID != firstJobID || bJobs[i].EleID != firstEleID || !bytes.Equal(bJobs[i].EleKey, firstEleKey) { + break + } + validLen++ + } + + return bJobs[:validLen], nil +} + +// GetAndMarkBackfillJobsForOneEle batch gets the backfill jobs in the tblName table that contains only one element, +// and update these jobs with instance ID and lease. +func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid string, lease time.Duration) ([]*BackfillJob, error) { + var validLen int + var bJobs []*BackfillJob + err := s.runInTxn(func(se *session) error { + currTime, err := GetOracleTimeWithStartTS(se) + if err != nil { + return err + } + + bJobs, err = GetBackfillJobs(se, BackfillTable, + fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_key, ele_id limit %d", + currTime.Add(-lease), jobID, batch), "get_mark_backfill_job") + if err != nil { + return err + } + if len(bJobs) == 0 { + return dbterror.ErrDDLJobNotFound.FastGen("get zero backfill job") + } + + validLen = 0 + firstJobID, firstEleID, firstEleKey := bJobs[0].JobID, bJobs[0].EleID, bJobs[0].EleKey + for i := 0; i < len(bJobs); i++ { + if bJobs[i].JobID != firstJobID || bJobs[i].EleID != firstEleID || !bytes.Equal(bJobs[i].EleKey, firstEleKey) { + break + } + validLen++ + + bJobs[i].InstanceID = uuid + bJobs[i].InstanceLease = GetLeaseGoTime(currTime, lease) + // TODO: batch update + if err = updateBackfillJob(se, BackfillTable, bJobs[i], "get_mark_backfill_job"); err != nil { + return err +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) } } diff --git a/ddl/modify_column_test.go b/ddl/modify_column_test.go index b52276408b754..e1b445acdf8e0 100644 --- a/ddl/modify_column_test.go +++ b/ddl/modify_column_test.go @@ -119,6 +119,7 @@ func TestModifyColumnReorgInfo(t *testing.T) { // check the consistency of the tables. currJobID := strconv.FormatInt(currJob.ID, 10) tk.MustQuery("select job_id, reorg, schema_ids, table_ids, type, processing from mysql.tidb_ddl_job where job_id = " + currJobID).Check(testkit.Rows()) +<<<<<<< HEAD /* // Commented this out, since it gives different result in CI in release-6.5 tk.MustQuery("select job_id from mysql.tidb_ddl_history where job_id = " + currJobID).Check(testkit.Rows(currJobID)) @@ -135,6 +136,18 @@ func TestModifyColumnReorgInfo(t *testing.T) { require.Nil(t, end) require.Zero(t, physicalID) */ +======= + tk.MustQuery("select job_id from mysql.tidb_ddl_history where job_id = " + currJobID).Check(testkit.Rows(currJobID)) + tk.MustQuery("select job_id, ele_id, ele_type, physical_id from mysql.tidb_ddl_reorg where job_id = " + currJobID).Check(testkit.Rows()) + require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) + e, start, end, physicalID, err := ddl.NewReorgHandlerForTest(testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob) + require.Error(t, err, "Error not ErrDDLReorgElementNotExists, found orphan row in tidb_ddl_reorg for job.ID %d: e: '%s', physicalID: %d, start: 0x%x end: 0x%x", currJob.ID, e, physicalID, start, end) + require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err)) + require.Nil(t, e) + require.Nil(t, start) + require.Nil(t, end) + require.Zero(t, physicalID) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) } expectedElements := []*meta.Element{ {ID: 4, TypeKey: meta.ColumnElementKey}, diff --git a/ddl/partition.go b/ddl/partition.go index d8ae0276d1f90..80d5ddab1be9b 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1797,6 +1797,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } +<<<<<<< HEAD var rh *reorgHandler if w.concurrentDDL { sctx, err1 := w.sessPool.get() @@ -1820,6 +1821,15 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( rh = newReorgHandler(t, w.sess, w.concurrentDDL) } reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, tbl, physicalTableIDs, elements) +======= + sctx, err1 := w.sessPool.get() + if err1 != nil { + return ver, err1 + } + defer w.sessPool.put(sctx) + rh := newReorgHandler(newSession(sctx)) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index 30a2e4dd755fb..4d7f5beacf8b3 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -747,6 +747,7 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err err if err != nil { return } +<<<<<<< HEAD defer sess.rollback() txn, err := sess.txn() if err != nil { @@ -755,6 +756,10 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err err } rh := newReorgHandler(meta.NewMeta(txn), sess, variable.EnableConcurrentDDL.Load()) err = rh.UpdateDDLReorgHandle(r.Job, startKey, r.EndKey, r.PhysicalTableID, r.currElement) +======= + rh := newReorgHandler(sess) + err = updateDDLReorgHandle(rh.s, r.Job.ID, startKey, r.EndKey, r.PhysicalTableID, r.currElement) +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) err1 := sess.commit() if err == nil { err = err1 @@ -764,13 +769,13 @@ func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err err // reorgHandler is used to handle the reorg information duration reorganization DDL job. type reorgHandler struct { - m *meta.Meta s *session enableConcurrentDDL bool } // NewReorgHandlerForTest creates a new reorgHandler, only used in test. +<<<<<<< HEAD func NewReorgHandlerForTest(m *meta.Meta, sess sessionctx.Context) *reorgHandler { return newReorgHandler(m, newSession(sess), variable.EnableConcurrentDDL.Load()) } @@ -785,6 +790,14 @@ func (r *reorgHandler) UpdateDDLReorgHandle(job *model.Job, startKey, endKey kv. return updateDDLReorgHandle(r.s, job.ID, startKey, endKey, physicalTableID, element) } return r.m.UpdateDDLReorgHandle(job.ID, startKey, endKey, physicalTableID, element) +======= +func NewReorgHandlerForTest(sess sessionctx.Context) *reorgHandler { + return newReorgHandler(newSession(sess)) +} + +func newReorgHandler(sess *session) *reorgHandler { + return &reorgHandler{s: sess} +>>>>>>> eb35c773b51 (ddl: avoid commit conflicts when updating/delete from mysql.tidb_ddl_reorg. (#38738)) } // InitDDLReorgHandle initializes the job reorganization information. @@ -830,6 +843,20 @@ func CleanupDDLReorgHandles(job *model.Job, w *worker, t *meta.Meta) { } } +// CleanupDDLReorgHandles removes the job reorganization related handles. +func CleanupDDLReorgHandles(job *model.Job, s *session) { + if job != nil && !job.IsFinished() && !job.IsSynced() { + // Job is given, but it is neither finished nor synced; do nothing + return + } + + err := cleanDDLReorgHandles(s, job) + if err != nil { + // ignore error, cleanup is not that critical + logutil.BgLogger().Warn("Failed removing the DDL reorg entry in tidb_ddl_reorg", zap.String("job", job.String()), zap.Error(err)) + } +} + // GetDDLReorgHandle gets the latest processed DDL reorganize position. func (r *reorgHandler) GetDDLReorgHandle(job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) { if r.enableConcurrentDDL {