diff --git a/ddl/ddl_api_test.go b/ddl/ddl_api_test.go index 18db2dfa62170..90f0291ae2b35 100644 --- a/ddl/ddl_api_test.go +++ b/ddl/ddl_api_test.go @@ -15,12 +15,175 @@ package ddl_test import ( + "context" "testing" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) +func TestGetDDLJobs(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + sess := testkit.NewTestKit(t, store).Session() + _, err := sess.Execute(context.Background(), "begin") + require.NoError(t, err) + + txn, err := sess.Txn(true) + require.NoError(t, err) + + cnt := 10 + jobs := make([]*model.Job, cnt) + var currJobs2 []*model.Job + for i := 0; i < cnt; i++ { + jobs[i] = &model.Job{ + ID: int64(i), + SchemaID: 1, + Type: model.ActionCreateTable, + } + err := addDDLJobs(txn, jobs[i]) + require.NoError(t, err) + + currJobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn)) + require.NoError(t, err) + require.Len(t, currJobs, i+1) + + currJobs2 = currJobs2[:0] + err = ddl.IterAllDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { + for _, job := range jobs { + if job.NotStarted() { + currJobs2 = append(currJobs2, job) + } else { + return true, nil + } + } + return false, nil + }) + require.NoError(t, err) + require.Len(t, currJobs2, i+1) + } + + currJobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn)) + require.NoError(t, err) + + for i, job := range jobs { + require.Equal(t, currJobs[i].ID, job.ID) + require.Equal(t, int64(1), job.SchemaID) + require.Equal(t, model.ActionCreateTable, job.Type) + } + require.Equal(t, currJobs2, currJobs) + + _, err = sess.Execute(context.Background(), "rollback") + require.NoError(t, err) +} + +func TestGetDDLJobsIsSort(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + sess := testkit.NewTestKit(t, store).Session() + _, err := sess.Execute(context.Background(), "begin") + require.NoError(t, err) + + txn, err := sess.Txn(true) + require.NoError(t, err) + + // insert 5 drop table jobs to DefaultJobListKey queue + enQueueDDLJobs(t, txn, model.ActionDropTable, 10, 15) + + // insert 5 create table jobs to DefaultJobListKey queue + enQueueDDLJobs(t, txn, model.ActionCreateTable, 0, 5) + + // insert add index jobs to AddIndexJobListKey queue + enQueueDDLJobs(t, txn, model.ActionAddIndex, 5, 10) + + currJobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn)) + require.NoError(t, err) + require.Len(t, currJobs, 15) + + isSort := slices.IsSortedFunc(currJobs, func(i, j *model.Job) bool { + return i.ID <= j.ID + }) + require.True(t, isSort) + + _, err = sess.Execute(context.Background(), "rollback") + require.NoError(t, err) +} + +func TestGetHistoryDDLJobs(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + + // delete the internal DDL record. + err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { + return meta.NewMeta(txn).ClearAllHistoryJob() + }) + + require.NoError(t, err) + + tk := testkit.NewTestKit(t, store) + sess := tk.Session() + tk.MustExec("begin") + + txn, err := sess.Txn(true) + require.NoError(t, err) + + m := meta.NewMeta(txn) + cnt := 11 + jobs := make([]*model.Job, cnt) + for i := 0; i < cnt; i++ { + jobs[i] = &model.Job{ + ID: int64(i), + SchemaID: 1, + Type: model.ActionCreateTable, + } + err = ddl.AddHistoryDDLJob(m, jobs[i], true) + require.NoError(t, err) + + historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs) + require.NoError(t, err) + + if i+1 > ddl.MaxHistoryJobs { + require.Len(t, historyJobs, ddl.MaxHistoryJobs) + } else { + require.Len(t, historyJobs, i+1) + } + } + + delta := cnt - ddl.MaxHistoryJobs + historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs) + require.NoError(t, err) + require.Len(t, historyJobs, ddl.MaxHistoryJobs) + + l := len(historyJobs) - 1 + for i, job := range historyJobs { + require.Equal(t, jobs[delta+l-i].ID, job.ID) + require.Equal(t, int64(1), job.SchemaID) + require.Equal(t, model.ActionCreateTable, job.Type) + } + + var historyJobs2 []*model.Job + err = ddl.IterHistoryDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { + for _, job := range jobs { + historyJobs2 = append(historyJobs2, job) + if len(historyJobs2) == ddl.DefNumHistoryJobs { + return true, nil + } + } + return false, nil + }) + require.NoError(t, err) + require.Equal(t, historyJobs, historyJobs2) + + tk.MustExec("rollback") +} + func TestIsJobRollbackable(t *testing.T) { cases := []struct { tp model.ActionType @@ -40,3 +203,15 @@ func TestIsJobRollbackable(t *testing.T) { require.Equal(t, ca.result, re) } } + +func enQueueDDLJobs(t *testing.T, txn kv.Transaction, jobType model.ActionType, start, end int) { + for i := start; i < end; i++ { + job := &model.Job{ + ID: int64(i), + SchemaID: 1, + Type: jobType, + } + err := addDDLJobs(txn, job) + require.NoError(t, err) + } +} diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 83f83c6dabae6..160d238b51eb5 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -689,88 +689,6 @@ func TestReorg(t *testing.T) { } } -func TestGetDDLJobs(t *testing.T) { - store, clean := newMockStore(t) - defer clean() - - txn, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - cnt := 10 - jobs := make([]*model.Job, cnt) - var currJobs2 []*model.Job - for i := 0; i < cnt; i++ { - jobs[i] = &model.Job{ - ID: int64(i), - SchemaID: 1, - Type: model.ActionCreateTable, - } - err = m.EnQueueDDLJob(jobs[i]) - require.NoError(t, err) - - currJobs, err := GetAllDDLJobs(meta.NewMeta(txn)) - require.NoError(t, err) - require.Len(t, currJobs, i+1) - - currJobs2 = currJobs2[:0] - err = IterAllDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { - for _, job := range jobs { - if job.NotStarted() { - currJobs2 = append(currJobs2, job) - } else { - return true, nil - } - } - return false, nil - }) - require.NoError(t, err) - require.Len(t, currJobs2, i+1) - } - - currJobs, err := GetAllDDLJobs(meta.NewMeta(txn)) - require.NoError(t, err) - - for i, job := range jobs { - require.Equal(t, currJobs[i].ID, job.ID) - require.Equal(t, int64(1), job.SchemaID) - require.Equal(t, model.ActionCreateTable, job.Type) - } - require.Equal(t, currJobs2, currJobs) - - err = txn.Rollback() - require.NoError(t, err) -} - -func TestGetDDLJobsIsSort(t *testing.T) { - store, clean := newMockStore(t) - defer clean() - - txn, err := store.Begin() - require.NoError(t, err) - - // insert 5 drop table jobs to DefaultJobListKey queue - m := meta.NewMeta(txn) - enQueueDDLJobs(t, m, model.ActionDropTable, 10, 15) - - // insert 5 create table jobs to DefaultJobListKey queue - enQueueDDLJobs(t, m, model.ActionCreateTable, 0, 5) - - // insert add index jobs to AddIndexJobListKey queue - m = meta.NewMeta(txn, meta.AddIndexJobListKey) - enQueueDDLJobs(t, m, model.ActionAddIndex, 5, 10) - - currJobs, err := GetAllDDLJobs(meta.NewMeta(txn)) - require.NoError(t, err) - require.Len(t, currJobs, 15) - - isSort := isJobsSorted(currJobs) - require.True(t, isSort) - - err = txn.Rollback() - require.NoError(t, err) -} - func TestCancelJobs(t *testing.T) { store, clean := newMockStore(t) defer clean() @@ -884,64 +802,6 @@ func TestCancelJobs(t *testing.T) { require.NoError(t, err) } -func TestGetHistoryDDLJobs(t *testing.T) { - store, clean := newMockStore(t) - defer clean() - - txn, err := store.Begin() - require.NoError(t, err) - - m := meta.NewMeta(txn) - cnt := 11 - jobs := make([]*model.Job, cnt) - for i := 0; i < cnt; i++ { - jobs[i] = &model.Job{ - ID: int64(i), - SchemaID: 1, - Type: model.ActionCreateTable, - } - err = AddHistoryDDLJob(m, jobs[i], true) - require.NoError(t, err) - - historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs) - require.NoError(t, err) - - if i+1 > MaxHistoryJobs { - require.Len(t, historyJobs, MaxHistoryJobs) - } else { - require.Len(t, historyJobs, i+1) - } - } - - delta := cnt - MaxHistoryJobs - historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs) - require.NoError(t, err) - require.Len(t, historyJobs, MaxHistoryJobs) - - l := len(historyJobs) - 1 - for i, job := range historyJobs { - require.Equal(t, jobs[delta+l-i].ID, job.ID) - require.Equal(t, int64(1), job.SchemaID) - require.Equal(t, model.ActionCreateTable, job.Type) - } - - var historyJobs2 []*model.Job - err = IterHistoryDDLJobs(txn, func(jobs []*model.Job) (b bool, e error) { - for _, job := range jobs { - historyJobs2 = append(historyJobs2, job) - if len(historyJobs2) == DefNumHistoryJobs { - return true, nil - } - } - return false, nil - }) - require.NoError(t, err) - require.Equal(t, historyJobs, historyJobs2) - - err = txn.Rollback() - require.NoError(t, err) -} - func TestError(t *testing.T) { kvErrs := []*terror.Error{ dbterror.ErrDDLJobNotFound, @@ -967,27 +827,3 @@ func newMockStore(t *testing.T) (store kv.Storage, clean func()) { return } - -func isJobsSorted(jobs []*model.Job) bool { - if len(jobs) <= 1 { - return true - } - for i := 1; i < len(jobs); i++ { - if jobs[i].ID <= jobs[i-1].ID { - return false - } - } - return true -} - -func enQueueDDLJobs(t *testing.T, m *meta.Meta, jobType model.ActionType, start, end int) { - for i := start; i < end; i++ { - job := &model.Job{ - ID: int64(i), - SchemaID: 1, - Type: jobType, - } - err := m.EnQueueDDLJob(job) - require.NoError(t, err) - } -} diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index e6006f53a6f2d..252448bd5203b 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -49,7 +49,9 @@ func TestReorgOwner(t *testing.T) { ) err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) { - return testkit.NewTestKit(t, store).Session(), nil + session := testkit.NewTestKit(t, store).Session() + session.GetSessionVars().CommonGlobalLoaded = true + return session, nil }, 20, 20, 5)) require.NoError(t, err) @@ -90,8 +92,7 @@ func TestReorgOwner(t *testing.T) { testDropSchema(t, sctx, d1, dbInfo) - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) - err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error { + err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, false, func(ctx context.Context, txn kv.Transaction) error { m := meta.NewMeta(txn) db, err1 := m.GetDatabase(dbInfo.ID) require.NoError(t, err1) diff --git a/ddl/schema_test.go b/ddl/schema_test.go index cdb071d5e0daf..ae9f85b0f12fa 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -290,7 +290,9 @@ func TestSchemaWaitJob(t *testing.T) { ddl.WithLease(testLease), ) err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) { - return testkit.NewTestKit(t, store).Session(), nil + session := testkit.NewTestKit(t, store).Session() + session.GetSessionVars().CommonGlobalLoaded = true + return session, nil }, 20, 20, 5)) require.NoError(t, err) defer func() { diff --git a/meta/meta.go b/meta/meta.go index 0ee38539c85b5..aef346c47b59e 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -1169,6 +1169,14 @@ func (m *Meta) UpdateDDLReorgHandle(job *model.Job, startKey, endKey kv.Key, phy return errors.Trace(err) } +// ClearAllHistoryJob clears all history jobs. **IT IS VERY DANGEROUS** +func (m *Meta) ClearAllHistoryJob() error { + if err := m.txn.HClear(mDDLJobHistoryKey); err != nil { + return errors.Trace(err) + } + return nil +} + // RemoveReorgElement removes the element of the reorganization information. func (m *Meta) RemoveReorgElement(job *model.Job) error { err := m.txn.HDel(mDDLJobReorgKey, m.reorgJobCurrentElement(job.ID))