Skip to content

Commit

Permalink
brie: support batch ddl for sql restore (#49089) (#49851)
Browse files Browse the repository at this point in the history
close #48301
  • Loading branch information
ti-chi-bot authored Dec 28, 2023
1 parent 88352e5 commit 767ec62
Show file tree
Hide file tree
Showing 14 changed files with 714 additions and 426 deletions.
7 changes: 0 additions & 7 deletions br/pkg/gluetidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ go_library(
"//domain",
"//executor",
"//kv",
"//meta/autoid",
"//parser/model",
"//parser/mysql",
"//session",
"//sessionctx",
"@com_github_pingcap_errors//:errors",
Expand All @@ -34,14 +32,9 @@ go_test(
flaky = True,
deps = [
"//br/pkg/glue",
"//ddl",
"//kv",
"//meta",
"//parser/model",
"//sessionctx",
"//testkit",
"//types",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
],
)
134 changes: 8 additions & 126 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
package gluetidb

import (
"bytes"
"context"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -17,9 +15,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
pd "github.com/tikv/pd/client"
Expand All @@ -32,13 +28,8 @@ var (
_ glue.Glue = Glue{}
)

const (
defaultCapOfCreateTable = 512
defaultCapOfCreateDatabase = 64
brComment = `/*from(br)*/`
)
const brComment = `/*from(br)*/`

// New makes a new tidb glue.
func New() Glue {
log.Debug("enabling no register config")
config.UpdateGlobal(func(conf *config.Config) {
Expand Down Expand Up @@ -192,17 +183,7 @@ func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateDatabase(schema)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
schema = schema.Clone()
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
return errors.Trace(executor.BRIECreateDatabase(gs.se, schema, brComment))
}

// CreatePlacementPolicy implements glue.Session.
Expand All @@ -213,91 +194,16 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, infos []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()

if err = d.BatchCreateTableWithInfo(gs.se, schema, infos, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
log.Info("entry too large, split batch create table", zap.Int("num table", len(infos)))
if len(infos) == 1 {
return err
}
mid := len(infos) / 2
err = gs.SplitBatchCreateTable(schema, infos[:mid], cs...)
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, infos[mid:], cs...)
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var dbName model.CIStr

// Disable foreign key check when batch create tables.
gs.se.GetSessionVars().ForeignKeyChecks = false
for db, tablesInDB := range tables {
dbName = model.NewCIStr(db)
queryBuilder := strings.Builder{}
cloneTables := make([]*model.TableInfo, 0, len(tablesInDB))
for _, table := range tablesInDB {
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}

queryBuilder.WriteString(query)
queryBuilder.WriteString(";")

table = table.Clone()
// Clone() does not clone partitions yet :(
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())

if err := gs.SplitBatchCreateTable(dbName, cloneTables, cs...); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
log.Warn("batch create table from tidb failure", zap.Error(err))
return err
}
}

return nil
func (gs *tidbSession) CreateTables(_ context.Context,
tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
return errors.Trace(executor.BRIECreateTables(gs.se, tables, brComment, cs...))
}

// CreateTable implements glue.Session.
func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
query, err := gs.showCreateTable(table)
if err != nil {
return errors.Trace(err)
}
gs.se.SetValue(sessionctx.QueryString, query)
// Clone() does not clone partitions yet :(
table = table.Clone()
if table.Partition != nil {
newPartition := *table.Partition
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}

return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...)
func (gs *tidbSession) CreateTable(_ context.Context, dbName model.CIStr,
table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
return errors.Trace(executor.BRIECreateTable(gs.se, dbName, table, brComment, cs...))
}

// Close implements glue.Session.
Expand All @@ -310,30 +216,6 @@ func (gs *tidbSession) GetGlobalVariable(name string) (string, error) {
return gs.se.GetSessionVars().GlobalVarsAccessor.GetTiDBTableValue(name)
}

// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo.
func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) {
table := tbl.Clone()
table.AutoIncID = 0
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo.
func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase))
// this can never fail.
_, _ = result.WriteString(brComment)
if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil {
return "", errors.Trace(err)
}
return result.String(), nil
}

func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}
Expand Down
Loading

0 comments on commit 767ec62

Please sign in to comment.