Skip to content

Commit

Permalink
binlog: allow multiple ddl targets (#30904)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Dec 24, 2021
1 parent 404895c commit 6e6db1f
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 3 deletions.
7 changes: 4 additions & 3 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,21 +798,22 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}

tblInfo := &model.TableInfo{}
var tblInfos = make([]*model.TableInfo, 0, len(tableNames))
var err error
for i, oldSchemaID := range oldSchemaIDs {
job.TableID = tableIDs[i]
ver, tblInfo, err = checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i])
ver, tblInfo, err := checkAndRenameTables(t, job, oldSchemaID, newSchemaIDs[i], oldSchemaNames[i], tableNames[i])
if err != nil {
return ver, errors.Trace(err)
}
tblInfos = append(tblInfos, tblInfo)
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
return ver, nil
}

Expand Down
67 changes: 67 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"fmt"
"testing"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -66,6 +67,24 @@ func testRenameTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID,
return job
}

func testRenameTables(
t *testing.T, ctx sessionctx.Context, d *ddl,
oldSchemaIDs, newSchemaIDs []int64, newTableNames []*model.CIStr,
oldTableIDs []int64, oldSchemaNames []*model.CIStr,
) *model.Job {
job := &model.Job{
Type: model.ActionRenameTables,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames},
}
err := d.doDDLJob(ctx, job)
require.NoError(t, err)

v := getSchemaVerT(t, ctx)
checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v, tbl: nil})
return job
}

func testLockTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSchemaID int64, tblInfo *model.TableInfo, lockTp model.TableLockType) *model.Job {
arg := &lockTablesArg{
LockTables: []model.TableLockTpInfo{{SchemaID: newSchemaID, TableID: tblInfo.ID, Tp: lockTp}},
Expand Down Expand Up @@ -326,3 +345,51 @@ func testAlterNoCacheTable(t *testing.T, ctx sessionctx.Context, d *ddl, newSche
checkHistoryJobArgsT(t, ctx, job.ID, &historyJobArgs{ver: v})
return job
}

func TestRenameTables(t *testing.T) {
store, err := mockstore.NewMockStore()
require.NoError(t, err)
ddl, err := testNewDDLAndStart(
context.Background(),
WithStore(store),
WithLease(testLease),
)
require.NoError(t, err)

dbInfo, err := testSchemaInfo(ddl, "test_table")
require.NoError(t, err)
testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo)

ctx := testNewContext(ddl)
var tblInfos = make([]*model.TableInfo, 0, 2)
var newTblInfos = make([]*model.TableInfo, 0, 2)
for i := 1; i < 3; i++ {
tableName := fmt.Sprintf("t%d", i)
tblInfo, err := testTableInfo(ddl, tableName, 3)
require.NoError(t, err)
job := testCreateTableT(t, ctx, ddl, dbInfo, tblInfo)
testCheckTableStateT(t, ddl, dbInfo, tblInfo, model.StatePublic)
testCheckJobDoneT(t, ddl, job, true)
tblInfos = append(tblInfos, tblInfo)

newTableName := fmt.Sprintf("tt%d", i)
tblInfo, err = testTableInfo(ddl, newTableName, 3)
require.NoError(t, err)
newTblInfos = append(newTblInfos, tblInfo)
}

job := testRenameTables(
t, ctx, ddl,
[]int64{dbInfo.ID, dbInfo.ID},
[]int64{dbInfo.ID, dbInfo.ID},
[]*model.CIStr{&newTblInfos[0].Name, &newTblInfos[1].Name},
[]int64{tblInfos[0].ID, tblInfos[1].ID},
[]*model.CIStr{&dbInfo.Name, &dbInfo.Name},
)

txn, _ := ctx.Txn(true)
historyJob, _ := meta.NewMeta(txn).GetHistoryDDLJob(job.ID)
wantTblInfos := historyJob.BinlogInfo.MultipleTableInfos
require.Equal(t, wantTblInfos[0].Name.L, "tt1")
require.Equal(t, wantTblInfos[1].Name.L, "tt2")
}
14 changes: 14 additions & 0 deletions parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type HistoryInfo struct {
DBInfo *DBInfo
TableInfo *TableInfo
FinishedTS uint64

// MultipleTableInfos is like TableInfo but only for operations updating multiple tables.
MultipleTableInfos []*TableInfo
}

// AddDBInfo adds schema version and schema information that are used for binlog.
Expand All @@ -196,6 +199,7 @@ func (h *HistoryInfo) Clean() {
h.SchemaVersion = 0
h.DBInfo = nil
h.TableInfo = nil
h.MultipleTableInfos = nil
}

// DDLReorgMeta is meta info of DDL reorganization.
Expand Down Expand Up @@ -279,6 +283,16 @@ func (job *Job) FinishTableJob(jobState JobState, schemaState SchemaState, ver i
job.BinlogInfo.AddTableInfo(ver, tblInfo)
}

// FinishMultipleTableJob is called when a job is finished.
// It updates the job's state information and adds tblInfos to the binlog.
func (job *Job) FinishMultipleTableJob(jobState JobState, schemaState SchemaState, ver int64, tblInfos []*TableInfo) {
job.State = jobState
job.SchemaState = schemaState
job.BinlogInfo.SchemaVersion = ver
job.BinlogInfo.MultipleTableInfos = tblInfos
job.BinlogInfo.TableInfo = tblInfos[len(tblInfos)-1]
}

// FinishDBJob is called when a job is finished.
// It updates the job's state information and adds dbInfo the binlog.
func (job *Job) FinishDBJob(jobState JobState, schemaState SchemaState, ver int64, dbInfo *DBInfo) {
Expand Down

0 comments on commit 6e6db1f

Please sign in to comment.