From aa6644eaae89ba0f34b53bcea59557a16f961809 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 20 Dec 2021 18:42:39 +0800 Subject: [PATCH 1/4] binlog: allow multiple ddl targets Signed-off-by: qupeng --- ddl/table.go | 7 ++++--- parser/model/ddl.go | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/ddl/table.go b/ddl/table.go index 83f7ad0b0e58a..e160217caf4ff 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -798,21 +798,22 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error return ver, errors.Trace(err) } - tblInfo := &model.TableInfo{} + var tblInfos = make([]*model.TableInfo, 0, len(tableNames)) var err error for i, oldSchemaID := range oldSchemaIDs { job.TableID = tableIDs[i] - ver, tblInfo, err = checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i]) + ver, tblInfo, err := checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i]) if err != nil { return ver, errors.Trace(err) } + tblInfos = append(tblInfos, tblInfo) } ver, err = updateSchemaVersion(t, job) if err != nil { return ver, errors.Trace(err) } - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos) return ver, nil } diff --git a/parser/model/ddl.go b/parser/model/ddl.go index c61372b55e263..9679904eac4c9 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -175,6 +175,11 @@ type HistoryInfo struct { DBInfo *DBInfo TableInfo *TableInfo FinishedTS uint64 + + // MultipleDBInfo is like DBInfo but only for operations updating multiple DBs. + MultipleDBInfo []*DBInfo + // MultipleTableInfo is like TableInfo but only for operations updating multiple DBs. + MultipleTableInfo []*TableInfo } // AddDBInfo adds schema version and schema information that are used for binlog. @@ -279,6 +284,15 @@ func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver i job.BinlogInfo.AddTableInfo(ver, tblInfo) } +// FinishMultipleTableJob is called when a job is finished. +// It updates the job's state information and adds tblInfos to the binlog. +func (job *Job) FinishMultipleTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfos []*TableInfo) { + job.State = jobState + job.SchemaState = schemaState + job.BinlogInfo.SchemaVersion = ver + job.BinlogInfo.MultipleTableInfo = tblInfos +} + // FinishDBJob is called when a job is finished. // It updates the job's state information and adds dbInfo the binlog. func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfo *DBInfo) { @@ -287,6 +301,15 @@ func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int6 job.BinlogInfo.AddDBInfo(ver, dbInfo) } +// FinishDBJob is called when a job is finished. +// It updates the job's state information and adds dbInfos the binlog. +func (job *Job) FinishMultipleDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfos []*DBInfo) { + job.State = jobState + job.SchemaState = schemaState + job.BinlogInfo.SchemaVersion = ver + job.BinlogInfo.MultipleDBInfo = dbInfos +} + // TSConvert2Time converts timestamp to time. func TSConvert2Time(ts uint64) time.Time { t := int64(ts >> 18) // 18 is for the logical time. From a2ef2712dc3d20e358a8b7007ccbd22eae7b73cc Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 22 Dec 2021 14:13:46 +0800 Subject: [PATCH 2/4] add tests Signed-off-by: qupeng --- ddl/table_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++ parser/model/ddl.go | 9 ------ 2 files changed, 67 insertions(+), 9 deletions(-) diff --git a/ddl/table_test.go b/ddl/table_test.go index f366c090b1686..1353da26bab64 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "fmt" "testing" "github.com/pingcap/errors" @@ -66,6 +67,24 @@ func testRenameTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID, return job } +func testRenameTables( + t *testing.T, ctx sessionctx.Context, d *ddl, + oldSchemaIDs, newSchemaIDs []int64, newTableNames []*model.CIStr, + oldTableIDs []int64, oldSchemaNames []*model.CIStr, +) *model.Job { + job := &model.Job{ + Type: model.ActionRenameTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames}, + } + err := d.doDDLJob(ctx, job) + require.NoError(t, err) + + v := getSchemaVerT(t, ctx) + checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil}) + return job +} + func testLockTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job { arg := &lockTablesArg{ LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}}, @@ -326,3 +345,51 @@ func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSche checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v}) return job } + +func TestRenameTables(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + require.NoError(t, err) + + dbInfo, err := testSchemaInfo(ddl, "test_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) + + ctx := testNewContext(ddl) + var tblInfos = make([]*model.TableInfo, 0, 2) + var newTblInfos = make([]*model.TableInfo, 0, 2) + for i := 1; i < 3; i++ { + tableName := fmt.Sprintf("t%d", i) + tblInfo, err := testTableInfo(ddl, tableName, 3) + require.NoError(t, err) + job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo) + testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic) + testCheckJobDoneT(t, ddl, job, true) + tblInfos = append(tblInfos, tblInfo) + + newTableName := fmt.Sprintf("tt%d", i) + tblInfo, err = testTableInfo(ddl, newTableName, 3) + require.NoError(t, err) + newTblInfos = append(newTblInfos, tblInfo) + } + + job := testRenameTables( + t, ctx, ddl, + []int64{dbInfo.ID, dbInfo.ID}, + []int64{dbInfo.ID, dbInfo.ID}, + []*model.CIStr{&newTblInfos[0].Name, &newTblInfos[1].Name}, + []int64{tblInfos[0].ID, tblInfos[1].ID}, + []*model.CIStr{&dbInfo.Name, &dbInfo.Name}, + ) + + txn, _ := ctx.Txn(true) + historyJob, _ := meta.NewMeta(txn).GetHistoryDDLJob(job.ID) + wantTblInfos := historyJob.BinlogInfo.MultipleTableInfo + require.Equal(t, wantTblInfos[0].Name.L, "tt1") + require.Equal(t, wantTblInfos[1].Name.L, "tt2") +} diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 9679904eac4c9..77692e5d5c810 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -301,15 +301,6 @@ func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int6 job.BinlogInfo.AddDBInfo(ver, dbInfo) } -// FinishDBJob is called when a job is finished. -// It updates the job's state information and adds dbInfos the binlog. -func (job *Job) FinishMultipleDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfos []*DBInfo) { - job.State = jobState - job.SchemaState = schemaState - job.BinlogInfo.SchemaVersion = ver - job.BinlogInfo.MultipleDBInfo = dbInfos -} - // TSConvert2Time converts timestamp to time. func TSConvert2Time(ts uint64) time.Time { t := int64(ts >> 18) // 18 is for the logical time. From 33ce0135b4b48fe2bd75a5e90a72cbd001826f6f Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 22 Dec 2021 14:29:46 +0800 Subject: [PATCH 3/4] address comments Signed-off-by: qupeng --- parser/model/ddl.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 77692e5d5c810..2ad98980410c0 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -176,8 +176,6 @@ type HistoryInfo struct { TableInfo *TableInfo FinishedTS uint64 - // MultipleDBInfo is like DBInfo but only for operations updating multiple DBs. - MultipleDBInfo []*DBInfo // MultipleTableInfo is like TableInfo but only for operations updating multiple DBs. MultipleTableInfo []*TableInfo } From 3bf577c5063b27fb5b38acd3201382df36fcf15b Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 22 Dec 2021 18:38:36 +0800 Subject: [PATCH 4/4] address comments Signed-off-by: qupeng --- ddl/table_test.go | 2 +- parser/model/ddl.go | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ddl/table_test.go b/ddl/table_test.go index 1353da26bab64..78be85a8dbd97 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -389,7 +389,7 @@ func TestRenameTables(t *testing.T) { txn, _ := ctx.Txn(true) historyJob, _ := meta.NewMeta(txn).GetHistoryDDLJob(job.ID) - wantTblInfos := historyJob.BinlogInfo.MultipleTableInfo + wantTblInfos := historyJob.BinlogInfo.MultipleTableInfos require.Equal(t, wantTblInfos[0].Name.L, "tt1") require.Equal(t, wantTblInfos[1].Name.L, "tt2") } diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 2ad98980410c0..9716cea38cd23 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -176,8 +176,8 @@ type HistoryInfo struct { TableInfo *TableInfo FinishedTS uint64 - // MultipleTableInfo is like TableInfo but only for operations updating multiple DBs. - MultipleTableInfo []*TableInfo + // MultipleTableInfos is like TableInfo but only for operations updating multiple tables. + MultipleTableInfos []*TableInfo } // AddDBInfo adds schema version and schema information that are used for binlog. @@ -199,6 +199,7 @@ func (h *HistoryInfo) Clean() { h.SchemaVersion = 0 h.DBInfo = nil h.TableInfo = nil + h.MultipleTableInfos = nil } // DDLReorgMeta is meta info of DDL reorganization. @@ -288,7 +289,8 @@ func (job *Job) FinishMultipleTableJob(jobState JobState, schemaState SchemaStat job.State = jobState job.SchemaState = schemaState job.BinlogInfo.SchemaVersion = ver - job.BinlogInfo.MultipleTableInfo = tblInfos + job.BinlogInfo.MultipleTableInfos = tblInfos + job.BinlogInfo.TableInfo = tblInfos[len(tblInfos)-1] } // FinishDBJob is called when a job is finished.