Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add COMMENTS column to DDL jobs and enhance job reorg meta handling #57392

Merged
merged 19 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestModifyColumnReorgCheckpoint(t *testing.T) {
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
Expand Down
177 changes: 98 additions & 79 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
Type: model.ActionMultiSchemaChange,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: info,
ReorgMeta: nil,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
if containsDistTaskSubJob(subJobs) {
job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
} else {
job.ReorgMeta = NewDDLReorgMeta(ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

err = checkMultiSchemaInfo(info, t)
if err != nil {
return errors.Trace(err)
Expand All @@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
return e.DoDDLJob(ctx, job)
}

func containsDistTaskSubJob(subJobs []*model.SubJob) bool {
for _, sub := range subJobs {
if sub.Type == model.ActionAddIndex ||
sub.Type == model.ActionAddPrimaryKey {
return true
}
}
return false
}

func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
Expand Down Expand Up @@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.TablePartitionArgs{
PartNames: partNames,
Expand Down Expand Up @@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
TableName: t.Meta().Name.L,
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
TableName: meta.Name.L,
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -3385,10 +3378,13 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
TableName: tbl.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.ModifyColumnArgs{
Column: newCol,
Expand Down Expand Up @@ -4645,11 +4641,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
OpType: model.OpAddIndex,
}

reorgMeta, err := newReorgMetaFromVariables(job, ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
Expand Down Expand Up @@ -4758,10 +4753,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddVectorIndex
indexPartSpecifications[0].Expr = nil
Expand All @@ -4788,32 +4780,20 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) {
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
}
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
if err != nil {
return nil, errors.Trace(err)
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
}
job.ReorgMeta = reorgMeta
return job, nil
return job
}

func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
Expand Down Expand Up @@ -4907,15 +4887,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
// global is set to 'false' is just there to be backwards compatible,
// to avoid unmarshal issues, it is now part of indexOption.
global := false
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)

job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddIndex
job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource

err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
Unique: unique,
Expand All @@ -4937,44 +4919,81 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
return errors.Trace(err)
}

func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
reorgMeta := NewDDLReorgMeta(sctx)
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
m := NewDDLReorgMeta(sctx)
setReorgParam := func() {
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
m.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.BatchSize = variable.TidbOptInt(sv, 0)
}
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
m.IsFastReorg = variable.EnableFastReorg.Load()
m.TargetScope = variable.ServiceScope.Load()
if hasSysDB(job) {
if m.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
}
m.IsDistReorg = false
m.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
}
if m.IsDistReorg && !m.IsFastReorg {
return dbterror.ErrUnsupportedDistTask
}
return nil
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
}
if hasSysDB(job) {
if reorgMeta.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
reorgMeta.IsDistReorg = false
reorgMeta.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
switch sub.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
continue
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
}
default:
return nil
}

job.ReorgMeta = m
logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
zap.Bool("enableDistTask", m.IsDistReorg),
zap.Bool("enableFastReorg", m.IsFastReorg),
zap.String("targetScope", m.TargetScope),
zap.Int("concurrency", m.Concurrency),
zap.Int("batchSize", m.BatchSize),
)
return reorgMeta, nil
return nil
}

// LastReorgMetaFastReorgDisabled is used for test.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) {
func loadCloudStorageURI(w *worker, job *model.Job) {
jc := w.jobContext(job.ID, job.ReorgMeta)
jc.cloudStorageURI = variable.CloudStorageURI.Load()
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 && job.ReorgMeta.IsDistReorg
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, jobCtx *jobContext, job *model.Job,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
require.Len(t, rows, n)
for i := 0; i < n; i++ {
//nolint: forcetypeassert
jobTp := rows[i][3].(string)
jobTp := rows[i][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp := rows[0][3].(string)
jobTp := rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)

tk.MustExec("drop table t;")
Expand All @@ -115,7 +115,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows = tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp = rows[0][3].(string)
jobTp = rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,14 @@ func GetModifiableColumnJob(
TableName: t.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(sctx),
CtxVars: []any{needChangeColData},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, sctx)
if err != nil {
return nil, errors.Trace(err)
}

args := &model.ModifyColumnArgs{
Column: newCol.ColumnInfo,
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error {
Revertible: true,
CtxVars: jobW.CtxVars,
ReorgTp: reorgTp,
UseCloud: false,
})
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,9 @@ func TestMultiSchemaChangeAdminShowDDLJobs(t *testing.T) {
assert.Equal(t, 3, len(rows))
assert.Equal(t, "test", rows[1][1])
assert.Equal(t, "t", rows[1][2])
assert.Equal(t, "add index /* subjob */ /* txn-merge */", rows[1][3])
assert.Equal(t, "add index /* subjob */", rows[1][3])
assert.Equal(t, "delete only", rows[1][4])
assert.Equal(t, "running", rows[1][len(rows[1])-1])
assert.Equal(t, "running", rows[1][len(rows[1])-2])
assert.True(t, len(rows[1][8].(string)) > 0)
assert.True(t, len(rows[1][9].(string)) > 0)
assert.True(t, len(rows[1][10].(string)) > 0)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestReorganizePartitionRollback(t *testing.T) {
// check job rollback finished
rows := tk.MustQuery("admin show ddl jobs where JOB_ID=" + jobID).Rows()
require.Equal(t, 1, len(rows))
require.Equal(t, "rollback done", rows[0][len(rows[0])-1])
require.Equal(t, "rollback done", rows[0][len(rows[0])-2])

// check table meta after rollback
tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" +
Expand Down
Loading