From 4f30a14256c1d51adcd4fdcef53ba837fa0c8438 Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 31 Dec 2021 17:13:50 +0800 Subject: [PATCH] ddl: support batch create table (#28763) --- br/pkg/gluetidb/glue.go | 4 +- ddl/db_test.go | 38 ++++++++ ddl/ddl.go | 22 ++--- ddl/ddl_api.go | 204 ++++++++++++++++++++++++++++++++-------- ddl/ddl_worker.go | 27 +++++- ddl/table.go | 117 +++++++++++++++++------ ddl/table_test.go | 47 +++++++++ executor/brie.go | 4 +- executor/executor.go | 10 ++ infoschema/builder.go | 42 ++++++--- parser/model/ddl.go | 11 +++ 11 files changed, 430 insertions(+), 96 deletions(-) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index c2cb64d21328a..1116e26b7e0ba 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -125,7 +125,7 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) if len(schema.Charset) == 0 { schema.Charset = mysql.DefaultCharset } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true) + return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore) } // CreateTable implements glue.Session. @@ -143,7 +143,7 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) table.Partition = &newPartition } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) + return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore) } // Close implements glue.Session. diff --git a/ddl/db_test.go b/ddl/db_test.go index 309ca8a03e736..cc854ff758c0a 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -7641,3 +7641,41 @@ func (s *testDBSuite8) TestCreateTextAdjustLen(c *C) { ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) tk.MustExec("drop table if exists t") } + +func (s *testDBSuite2) TestCreateTables(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tables_1") + tk.MustExec("drop table if exists tables_2") + tk.MustExec("drop table if exists tables_3") + + d := s.dom.DDL() + infos := []*model.TableInfo{} + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_1"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_2"), + }) + infos = append(infos, &model.TableInfo{ + Name: model.NewCIStr("tables_3"), + }) + + // correct name + err := d.BatchCreateTableWithInfo(tk.Se, model.NewCIStr("test"), infos, ddl.OnExistError) + c.Check(err, IsNil) + + tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3")) + job := tk.MustQuery("admin show ddl jobs").Rows()[0] + c.Assert(job[1], Equals, "test") + c.Assert(job[2], Equals, "tables_1,tables_2,tables_3") + c.Assert(job[3], Equals, "create tables") + c.Assert(job[4], Equals, "public") + // FIXME: we must change column type to give multiple id + // c.Assert(job[6], Matches, "[^,]+,[^,]+,[^,]+") + + // duplicated name + infos[1].Name = model.NewCIStr("tables_1") + err = d.BatchCreateTableWithInfo(tk.Se, model.NewCIStr("test"), infos, ddl.OnExistError) + c.Check(terror.ErrorEqual(err, infoschema.ErrTableExists), IsTrue) +} diff --git a/ddl/ddl.go b/ddl/ddl.go index 2277e35ac4b0b..bf1762ece2ff6 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -123,32 +123,28 @@ type DDL interface { // CreateSchemaWithInfo creates a database (schema) given its database info. // - // If `tryRetainID` is true, this method will try to keep the database ID specified in - // the `info` rather than generating new ones. This is just a hint though, if the ID collides - // with an existing database a new ID will always be used. - // // WARNING: the DDL owns the `info` after calling this function, and will modify its fields // in-place. If you want to keep using `info`, please call Clone() first. CreateSchemaWithInfo( ctx sessionctx.Context, info *model.DBInfo, - onExist OnExist, - tryRetainID bool) error + onExist OnExist) error // CreateTableWithInfo creates a table, view or sequence given its table info. // - // If `tryRetainID` is true, this method will try to keep the table ID specified in the `info` - // rather than generating new ones. This is just a hint though, if the ID collides with an - // existing table a new ID will always be used. - // // WARNING: the DDL owns the `info` after calling this function, and will modify its fields // in-place. If you want to keep using `info`, please call Clone() first. CreateTableWithInfo( ctx sessionctx.Context, schema model.CIStr, info *model.TableInfo, - onExist OnExist, - tryRetainID bool) error + onExist OnExist) error + + // BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables. + BatchCreateTableWithInfo(ctx sessionctx.Context, + schema model.CIStr, + info []*model.TableInfo, + onExist OnExist) error // Start campaigns the owner and starts workers. // ctxPool is used for the worker's delRangeManager and creates sessions. @@ -253,10 +249,10 @@ func asyncNotifyEvent(d *ddlCtx, e *util.Event) { case d.ddlEventCh <- e: return default: - logutil.BgLogger().Warn("[ddl] fail to notify DDL event", zap.String("event", e.String())) time.Sleep(time.Microsecond * 10) } } + logutil.BgLogger().Warn("[ddl] fail to notify DDL event", zap.String("event", e.String())) } } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 0cac506710468..c1a75e66bea35 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -94,14 +94,13 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn return errors.Trace(err) } - return d.CreateSchemaWithInfo(ctx, dbInfo, OnExistError, false /*tryRetainID*/) + return d.CreateSchemaWithInfo(ctx, dbInfo, OnExistError) } func (d *ddl) CreateSchemaWithInfo( ctx sessionctx.Context, dbInfo *model.DBInfo, onExist OnExist, - tryRetainID bool, ) error { is := d.GetInfoSchemaWithInterceptor(ctx) _, ok := is.SchemaByName(dbInfo.Name) @@ -2008,7 +2007,7 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist, false /*tryRetainID*/) + return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, onExist) } func setTemporaryType(ctx sessionctx.Context, tbInfo *model.TableInfo, s *ast.CreateTableStmt) error { @@ -2027,17 +2026,21 @@ func setTemporaryType(ctx sessionctx.Context, tbInfo *model.TableInfo, s *ast.Cr return nil } -func (d *ddl) CreateTableWithInfo( +// createTableWithInfoJob returns the table creation job. +// WARNING: it may return a nil job, which means you don't need to submit any DDL job. +// WARNING!!!: if retainID == true, it will not allocate ID by itself. That means if the caller +// can not promise ID is unique, then we got inconsistency. +func (d *ddl) createTableWithInfoJob( ctx sessionctx.Context, dbName model.CIStr, tbInfo *model.TableInfo, onExist OnExist, - tryRetainID bool, -) (err error) { + retainID bool, +) (job *model.Job, err error) { is := d.GetInfoSchemaWithInterceptor(ctx) schema, ok := is.SchemaByName(dbName) if !ok { - return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName) + return nil, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName) } var oldViewTblID int64 @@ -2046,7 +2049,7 @@ func (d *ddl) CreateTableWithInfo( switch onExist { case OnExistIgnore: ctx.GetSessionVars().StmtCtx.AppendNote(err) - return nil + return nil, nil case OnExistReplace: // only CREATE OR REPLACE VIEW is supported at the moment. if tbInfo.View != nil { @@ -2055,27 +2058,28 @@ func (d *ddl) CreateTableWithInfo( break } // The object to replace isn't a view. - return ErrWrongObject.GenWithStackByArgs(dbName, tbInfo.Name, "VIEW") + return nil, ErrWrongObject.GenWithStackByArgs(dbName, tbInfo.Name, "VIEW") } - return err + return nil, err default: - return err + return nil, err } } - // FIXME: Implement `tryRetainID` - if err := d.assignTableID(tbInfo); err != nil { - return errors.Trace(err) - } + if !retainID { + if err := d.assignTableID(tbInfo); err != nil { + return nil, errors.Trace(err) + } - if tbInfo.Partition != nil { - if err := d.assignPartitionIDs(tbInfo.Partition.Definitions); err != nil { - return errors.Trace(err) + if tbInfo.Partition != nil { + if err := d.assignPartitionIDs(tbInfo.Partition.Definitions); err != nil { + return nil, errors.Trace(err) + } } } if err := checkTableInfoValidExtra(tbInfo); err != nil { - return err + return nil, err } var actionType model.ActionType @@ -2090,7 +2094,7 @@ func (d *ddl) CreateTableWithInfo( actionType = model.ActionCreateTable } - job := &model.Job{ + job = &model.Job{ SchemaID: schema.ID, TableID: tbInfo.ID, SchemaName: schema.Name.L, @@ -2098,6 +2102,46 @@ func (d *ddl) CreateTableWithInfo( BinlogInfo: &model.HistoryInfo{}, Args: args, } + return job, nil +} + +func (d *ddl) createTableWithInfoPost( + ctx sessionctx.Context, + tbInfo *model.TableInfo, + schemaID int64, +) error { + var err error + d.preSplitAndScatter(ctx, tbInfo, tbInfo.GetPartitionInfo()) + if tbInfo.AutoIncID > 1 { + // Default tableAutoIncID base is 0. + // If the first ID is expected to greater than 1, we need to do rebase. + newEnd := tbInfo.AutoIncID - 1 + if err = d.handleAutoIncID(tbInfo, schemaID, newEnd, autoid.RowIDAllocType); err != nil { + return errors.Trace(err) + } + } + if tbInfo.AutoRandID > 1 { + // Default tableAutoRandID base is 0. + // If the first ID is expected to greater than 1, we need to do rebase. + newEnd := tbInfo.AutoRandID - 1 + err = d.handleAutoIncID(tbInfo, schemaID, newEnd, autoid.AutoRandomType) + } + return err +} + +func (d *ddl) CreateTableWithInfo( + ctx sessionctx.Context, + dbName model.CIStr, + tbInfo *model.TableInfo, + onExist OnExist, +) (err error) { + job, err := d.createTableWithInfoJob(ctx, dbName, tbInfo, onExist, false) + if err != nil { + return err + } + if job == nil { + return nil + } err = d.doDDLJob(ctx, job) if err != nil { @@ -2106,26 +2150,112 @@ func (d *ddl) CreateTableWithInfo( ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } - } else if actionType == model.ActionCreateTable { - d.preSplitAndScatter(ctx, tbInfo, tbInfo.GetPartitionInfo()) - if tbInfo.AutoIncID > 1 { - // Default tableAutoIncID base is 0. - // If the first ID is expected to greater than 1, we need to do rebase. - newEnd := tbInfo.AutoIncID - 1 - if err = d.handleAutoIncID(tbInfo, schema.ID, newEnd, autoid.RowIDAllocType); err != nil { - return errors.Trace(err) + } else { + err = d.createTableWithInfoPost(ctx, tbInfo, job.SchemaID) + } + + err = d.callHookOnChanged(err) + return errors.Trace(err) +} + +func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, + dbName model.CIStr, + infos []*model.TableInfo, + onExist OnExist) error { + jobs := &model.Job{ + BinlogInfo: &model.HistoryInfo{}, + } + args := make([]*model.TableInfo, 0, len(infos)) + + var err error + + // 1. counts how many IDs are there + // 2. if there is any duplicated table name + totalID := 0 + duplication := make(map[string]struct{}) + for _, info := range infos { + if _, ok := duplication[info.Name.L]; ok { + err = infoschema.ErrTableExists.FastGenByArgs("can not batch create tables with same name") + if onExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + err = nil } } - if tbInfo.AutoRandID > 1 { - // Default tableAutoRandID base is 0. - // If the first ID is expected to greater than 1, we need to do rebase. - newEnd := tbInfo.AutoRandID - 1 - err = d.handleAutoIncID(tbInfo, schema.ID, newEnd, autoid.AutoRandomType) + if err != nil { + return errors.Trace(err) + } + + duplication[info.Name.L] = struct{}{} + + totalID += 1 + parts := info.GetPartitionInfo() + if parts != nil { + totalID += len(parts.Definitions) } } - err = d.callHookOnChanged(err) - return errors.Trace(err) + genIDs, err := d.genGlobalIDs(totalID) + if err != nil { + return errors.Trace(err) + } + + for _, info := range infos { + info.ID, genIDs = genIDs[0], genIDs[1:] + + if parts := info.GetPartitionInfo(); parts != nil { + for i := range parts.Definitions { + parts.Definitions[i].ID, genIDs = genIDs[0], genIDs[1:] + } + } + + job, err := d.createTableWithInfoJob(ctx, dbName, info, onExist, true) + if err != nil { + return errors.Trace(err) + } + if job == nil { + continue + } + + // if jobs.Type == model.ActionCreateTables, it is initialized + // if not, initialize jobs by job.XXXX + if jobs.Type != model.ActionCreateTables { + jobs.Type = model.ActionCreateTables + jobs.SchemaID = job.SchemaID + jobs.SchemaName = job.SchemaName + } + + // append table job args + if len(job.Args) != 1 { + return errors.Trace(fmt.Errorf("except only one argument")) + } + info, ok := job.Args[0].(*model.TableInfo) + if !ok { + return errors.Trace(fmt.Errorf("except table info")) + } + args = append(args, info) + } + if len(args) == 0 { + return nil + } + jobs.Args = append(jobs.Args, args) + + err = d.doDDLJob(ctx, jobs) + if err != nil { + // table exists, but if_not_exists flags is true, so we ignore this error. + if onExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { + ctx.GetSessionVars().StmtCtx.AppendNote(err) + err = nil + } + return errors.Trace(d.callHookOnChanged(err)) + } + + for j := range infos { + if err = d.createTableWithInfoPost(ctx, infos[j], jobs.SchemaID); err != nil { + return errors.Trace(d.callHookOnChanged(err)) + } + } + + return nil } // preSplitAndScatter performs pre-split and scatter of the table's regions. @@ -2227,7 +2357,7 @@ func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err err onExist = OnExistReplace } - return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, onExist, false /*tryRetainID*/) + return d.CreateTableWithInfo(ctx, s.ViewName.Schema, tbInfo, onExist) } func buildViewInfo(ctx sessionctx.Context, s *ast.CreateViewStmt) (*model.ViewInfo, error) { @@ -6203,7 +6333,7 @@ func (d *ddl) CreateSequence(ctx sessionctx.Context, stmt *ast.CreateSequenceStm onExist = OnExistIgnore } - return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, onExist, false /*tryRetainID*/) + return d.CreateTableWithInfo(ctx, ident.Schema, tbInfo, onExist) } func (d *ddl) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequenceStmt) error { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index c2b3dce8ccfff..a6f75a8965334 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -412,8 +412,16 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { err = w.deleteRange(w.ddlJobCtx, job) } } - if job.Type == model.ActionRecoverTable { + + switch job.Type { + case model.ActionRecoverTable: err = finishRecoverTable(w, job) + case model.ActionCreateTables: + if job.IsCancelled() { + // it may be too large that it can not be added to the history queue, too + // delete its arguments + job.Args = nil + } } if err != nil { return errors.Trace(err) @@ -759,6 +767,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, ver, err = onModifySchemaDefaultPlacement(t, job) case model.ActionCreateTable: ver, err = onCreateTable(d, t, job) + case model.ActionCreateTables: + ver, err = onCreateTables(d, t, job) case model.ActionRepairTable: ver, err = onRepairTable(d, t, job) case model.ActionCreateView: @@ -983,6 +993,21 @@ func updateSchemaVersion(t *meta.Meta, job *model.Job) (int64, error) { SchemaID: job.SchemaID, } switch job.Type { + case model.ActionCreateTables: + tableInfos := []*model.TableInfo{} + err = job.DecodeArgs(&tableInfos) + if err != nil { + return 0, errors.Trace(err) + } + diff.AffectedOpts = make([]*model.AffectedOption, len(tableInfos)) + for i := range tableInfos { + diff.AffectedOpts[i] = &model.AffectedOption{ + SchemaID: job.SchemaID, + OldSchemaID: job.SchemaID, + TableID: tableInfos[i].ID, + OldTableID: tableInfos[i].ID, + } + } case model.ActionTruncateTable: // Truncate table has two table ID, should be handled differently. err = job.DecodeArgs(&diff.TableID) diff --git a/ddl/table.go b/ddl/table.go index a60e5d9a76b20..0966e9a82ca5e 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -44,20 +44,12 @@ import ( const tiflashCheckTiDBHTTPAPIHalfInterval = 2500 * time.Millisecond -func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { - failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { - if val.(bool) { - failpoint.Return(ver, errors.New("mock do job error")) - } - }) - +// DANGER: it is an internal function used by onCreateTable and onCreateTables, for reusing code. Be careful. +// 1. it expects the argument of job has been deserialized. +// 2. it won't call updateSchemaVersion, FinishTableJob and asyncNotifyEvent. +func createTable(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, error) { schemaID := job.SchemaID - tbInfo := &model.TableInfo{} - if err := job.DecodeArgs(tbInfo); err != nil { - // Invalid arguments, cancel this job. - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } + tbInfo := job.Args[0].(*model.TableInfo) tbInfo.State = model.StateNone err := checkTableNotExists(d, t, schemaID, tbInfo.Name.L) @@ -65,12 +57,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) { job.State = model.JobStateCancelled } - return ver, errors.Trace(err) - } - - ver, err = updateSchemaVersion(t, job) - if err != nil { - return ver, errors.Trace(err) + return tbInfo, errors.Trace(err) } switch tbInfo.State { @@ -80,42 +67,114 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) tbInfo.UpdateTS = t.StartTS err = createTableOrViewWithCheck(t, job, schemaID, tbInfo) if err != nil { - return ver, errors.Trace(err) + return tbInfo, errors.Trace(err) } failpoint.Inject("checkOwnerCheckAllVersionsWaitTime", func(val failpoint.Value) { if val.(bool) { - failpoint.Return(ver, errors.New("mock create table error")) + failpoint.Return(tbInfo, errors.New("mock create table error")) } }) // build table & partition bundles if any. if err = checkAllTablePlacementPoliciesExistAndCancelNonExistJob(t, job, tbInfo); err != nil { - return ver, errors.Trace(err) + return tbInfo, errors.Trace(err) } bundles, err := placement.NewFullTableBundles(t, tbInfo) if err != nil { job.State = model.JobStateCancelled - return ver, errors.Trace(err) + return tbInfo, errors.Trace(err) } // Send the placement bundle to PD. err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) if err != nil { job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + return tbInfo, errors.Wrapf(err, "failed to notify PD the placement rules") } - // Finish this job. - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) - asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo}) - return ver, nil + return tbInfo, nil default: - return ver, ErrInvalidDDLState.GenWithStackByArgs("table", tbInfo.State) + return tbInfo, ErrInvalidDDLState.GenWithStackByArgs("table", tbInfo.State) } } +func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ver, errors.New("mock do job error")) + } + }) + + // just decode, createTable will use it as Args[0] + tbInfo := &model.TableInfo{} + if err := job.DecodeArgs(tbInfo); err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + tbInfo, err := createTable(d, t, job) + if err != nil { + return ver, errors.Trace(err) + } + + ver, err = updateSchemaVersion(t, job) + if err != nil { + return ver, errors.Trace(err) + } + + // Finish this job. + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) + asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: tbInfo}) + return ver, errors.Trace(err) +} + +func onCreateTables(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) { + var ver int64 + + args := []*model.TableInfo{} + err := job.DecodeArgs(&args) + if err != nil { + // Invalid arguments, cancel this job. + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + + // We don't construct jobs for every table, but only tableInfo + // The following loop creates a stub job for every table + // + // &*job clones a stub job from the ActionCreateTables job + stubJob := &*job + stubJob.Args = make([]interface{}, 1) + for i := range args { + stubJob.TableID = args[i].ID + stubJob.Args[0] = args[i] + tbInfo, err := createTable(d, t, stubJob) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + args[i] = tbInfo + } + + ver, err = updateSchemaVersion(t, job) + if err != nil { + return ver, errors.Trace(err) + } + + job.State = model.JobStateDone + job.SchemaState = model.StatePublic + job.BinlogInfo.SetTableInfos(ver, args) + + for i := range args { + asyncNotifyEvent(d, &util.Event{Tp: model.ActionCreateTable, TableInfo: args[i]}) + } + + return ver, errors.Trace(err) +} + func createTableOrViewWithCheck(t *meta.Meta, job *model.Job, schemaID int64, tbInfo *model.TableInfo) error { err := checkTableInfoValid(tbInfo) if err != nil { diff --git a/ddl/table_test.go b/ddl/table_test.go index a3304560bafb6..3f5d100df2250 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -393,3 +393,50 @@ func ExportTestRenameTables(t *testing.T) { require.Equal(t, wantTblInfos[0].Name.L, "tt1") require.Equal(t, wantTblInfos[1].Name.L, "tt2") } + +func TestCreateTables(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + ddl, err := testNewDDLAndStart( + context.Background(), + WithStore(store), + WithLease(testLease), + ) + require.NoError(t, err) + + dbInfo, err := testSchemaInfo(ddl, "test_table") + require.NoError(t, err) + testCreateSchemaT(t, testNewContext(ddl), ddl, dbInfo) + + ctx := testNewContext(ddl) + + infos := []*model.TableInfo{} + genIDs, err := ddl.genGlobalIDs(3) + require.NoError(t, err) + + infos = append(infos, &model.TableInfo{ + ID: genIDs[0], + Name: model.NewCIStr("s1"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[1], + Name: model.NewCIStr("s2"), + }) + infos = append(infos, &model.TableInfo{ + ID: genIDs[2], + Name: model.NewCIStr("s3"), + }) + + job := &model.Job{ + SchemaID: dbInfo.ID, + Type: model.ActionCreateTables, + BinlogInfo: &model.HistoryInfo{}, + Args: []interface{}{infos}, + } + err = ddl.doDDLJob(ctx, job) + require.NoError(t, err) + + testGetTableT(t, ddl, dbInfo.ID, genIDs[0]) + testGetTableT(t, ddl, dbInfo.ID, genIDs[1]) + testGetTableT(t, ddl, dbInfo.ID, genIDs[2]) +} diff --git a/executor/brie.go b/executor/brie.go index e72958d9d5f7d..bfeac3a740e11 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -483,7 +483,7 @@ func (gs *tidbGlueSession) CreateDatabase(ctx context.Context, schema *model.DBI if len(schema.Charset) == 0 { schema.Charset = mysql.DefaultCharset } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true) + return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore) } // CreateTable implements glue.Session @@ -498,7 +498,7 @@ func (gs *tidbGlueSession) CreateTable(ctx context.Context, dbName model.CIStr, table.Partition = &newPartition } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) + return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore) } // Close implements glue.Session diff --git a/executor/executor.go b/executor/executor.go index 163588e55b6d9..6f4867438134a 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -472,6 +472,16 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che if job.BinlogInfo.TableInfo != nil { tableName = job.BinlogInfo.TableInfo.Name.L } + if job.BinlogInfo.MultipleTableInfos != nil { + tablenames := new(strings.Builder) + for i, affect := range job.BinlogInfo.MultipleTableInfos { + if i > 0 { + fmt.Fprintf(tablenames, ",") + } + fmt.Fprintf(tablenames, "%s", affect.Name.L) + } + tableName = tablenames.String() + } if len(schemaName) == 0 && job.BinlogInfo.DBInfo != nil { schemaName = job.BinlogInfo.DBInfo.Name.L } diff --git a/infoschema/builder.go b/infoschema/builder.go index 12b8807f84385..b4dbc8f22bf28 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -57,8 +57,6 @@ type Builder struct { // Return the detail updated table IDs that are produced from SchemaDiff and an error. func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { b.is.schemaMetaVersion = diff.Version - var tblIDs []int64 - var err error switch diff.Type { case model.ActionCreateSchema: return nil, b.applyCreateSchema(m, diff) @@ -74,19 +72,39 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro return b.applyDropPolicy(diff.SchemaID), nil case model.ActionAlterPlacementPolicy: return b.applyAlterPolicy(m, diff) + case model.ActionTruncateTablePartition, model.ActionTruncateTable: + return b.applyTruncateTableOrPartition(m, diff) + case model.ActionDropTable, model.ActionDropTablePartition: + return b.applyDropTableOrParition(m, diff) + case model.ActionRecoverTable: + return b.applyRecoverTable(m, diff) + case model.ActionCreateTables: + return b.applyCreateTables(m, diff) default: - switch diff.Type { - case model.ActionTruncateTablePartition, model.ActionTruncateTable: - tblIDs, err = b.applyTruncateTableOrPartition(m, diff) - case model.ActionDropTable, model.ActionDropTablePartition: - tblIDs, err = b.applyDropTableOrParition(m, diff) - case model.ActionRecoverTable: - tblIDs, err = b.applyRecoverTable(m, diff) - default: - tblIDs, err = b.applyDefaultAction(m, diff) + return b.applyDefaultAction(m, diff) + } +} + +func (b *Builder) applyCreateTables(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { + tblIDs := make([]int64, 0, len(diff.AffectedOpts)) + if diff.AffectedOpts != nil { + for _, opt := range diff.AffectedOpts { + affectedDiff := &model.SchemaDiff{ + Version: diff.Version, + Type: model.ActionCreateTable, + SchemaID: opt.SchemaID, + TableID: opt.TableID, + OldSchemaID: opt.OldSchemaID, + OldTableID: opt.OldTableID, + } + affectedIDs, err := b.ApplyDiff(m, affectedDiff) + if err != nil { + return nil, errors.Trace(err) + } + tblIDs = append(tblIDs, affectedIDs...) } } - return tblIDs, err + return tblIDs, nil } func (b *Builder) applyTruncateTableOrPartition(m *meta.Meta, diff *model.SchemaDiff) ([]int64, error) { diff --git a/parser/model/ddl.go b/parser/model/ddl.go index 9716cea38cd23..0636556120a5a 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -94,12 +94,14 @@ const ( ActionAlterCacheTable ActionType = 57 ActionAlterTableStatsOptions ActionType = 58 ActionAlterNoCacheTable ActionType = 59 + ActionCreateTables ActionType = 60 ) var actionMap = map[ActionType]string{ ActionCreateSchema: "create schema", ActionDropSchema: "drop schema", ActionCreateTable: "create table", + ActionCreateTables: "create tables", ActionDropTable: "drop table", ActionAddColumn: "add column", ActionDropColumn: "drop column", @@ -194,6 +196,15 @@ func (h *HistoryInfo) AddTableInfo(schemaVer int64, tblInfo *TableInfo) { h.TableInfo = tblInfo } +// SetTableInfos is like AddTableInfo, but will add multiple table infos to the binlog. +func (h *HistoryInfo) SetTableInfos(schemaVer int64, tblInfos []*TableInfo) { + h.SchemaVersion = schemaVer + h.MultipleTableInfos = make([]*TableInfo, len(tblInfos)) + for i, info := range tblInfos { + h.MultipleTableInfos[i] = info + } +} + // Clean cleans history information. func (h *HistoryInfo) Clean() { h.SchemaVersion = 0