diff --git a/ddl/column.go b/ddl/column.go index e9c805a385fd9..e1da483250f59 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -823,7 +823,6 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) - // TODO: add session begin/commit handling including cleaning up tidb_ddl_reorg err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { defer util.Recover(metrics.LabelDDL, "onModifyColumn", func() { diff --git a/ddl/partition.go b/ddl/partition.go index bbf04c8756fbb..2c95f389707f9 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1764,7 +1764,6 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( // and then run the reorg next time. return ver, errors.Trace(err) } - // TODO: Use an own session! err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (dropIndexErr error) { defer tidbutil.Recover(metrics.LabelDDL, "onDropTablePartition", func() { diff --git a/ddl/reorg.go b/ddl/reorg.go index e1910dbc88e7b..b29b563abe14a 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -253,9 +253,10 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo } updateBackfillProgress(w, reorgInfo, tblInfo, 0) - // TODO: Move the remove reorg handle into the worker - // Also never start a transaction for the session (rh?) - if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + + // Do this is a separate transaction, since mysql.tidb_ddl_reorg may have been updated + // by the inner function and could result in commit conflicts. + if err1 := reorgInfo.deleteReorgMeta(w.sessPool); err1 != nil { logutil.BgLogger().Warn("[ddl] run reorg job done, removeDDLReorgHandle failed", zap.Error(err1)) return errors.Trace(err1) } @@ -644,7 +645,6 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, failpoint.Inject("errorUpdateReorgHandle", func() (*reorgInfo, error) { return &info, errors.New("occur an error when update reorg handle") }) - // TODO: Wrap these two into a single transaction? err = rh.RemoveDDLReorgHandle(job, elements) if err != nil { return &info, errors.Trace(err) @@ -755,6 +755,35 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo return &info, nil } +func (r *reorgInfo) deleteReorgMeta(pool *sessionPool) error { + if len(r.elements) == 0 { + return nil + } + se, err := pool.get() + if err != nil { + return errors.Trace(err) + } + defer pool.put(se) + + sess := newSession(se) + err = sess.begin() + if err != nil { + return errors.Trace(err) + } + txn, err := sess.txn() + if err != nil { + sess.rollback() + return errors.Trace(err) + } + rh := newReorgHandler(meta.NewMeta(txn), sess, variable.EnableConcurrentDDL.Load()) + err = rh.RemoveDDLReorgHandle(r.Job, r.elements) + err1 := sess.commit() + if err == nil { + err = err1 + } + return errors.Trace(err) +} + func (r *reorgInfo) UpdateReorgMeta(startKey kv.Key, pool *sessionPool) (err error) { if startKey == nil && r.EndKey == nil { return nil diff --git a/executor/oomtest/oom_test.go b/executor/oomtest/oom_test.go index 4db5cb618e81b..fc95bb47ceab8 100644 --- a/executor/oomtest/oom_test.go +++ b/executor/oomtest/oom_test.go @@ -44,7 +44,6 @@ func TestMain(m *testing.M) { } func TestMemTracker4UpdateExec(t *testing.T) { - t.Skip("TODO: remove CI hacking") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -64,7 +63,6 @@ func TestMemTracker4UpdateExec(t *testing.T) { } func TestMemTracker4InsertAndReplaceExec(t *testing.T) { - t.Skip("TODO: remove CI hacking") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -135,7 +133,6 @@ func TestMemTracker4InsertAndReplaceExec(t *testing.T) { } func TestMemTracker4DeleteExec(t *testing.T) { - t.Skip("TODO: remove CI hacking") store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store)