Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add COMMENTS column to DDL jobs and enhance job reorg meta handling #57392

Merged
merged 19 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 74 additions & 79 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
Type: model.ActionMultiSchemaChange,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: info,
ReorgMeta: nil,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
if containsDistTaskSubJob(subJobs) {
job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
} else {
job.ReorgMeta = NewDDLReorgMeta(ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

err = checkMultiSchemaInfo(info, t)
if err != nil {
return errors.Trace(err)
Expand All @@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
return e.DoDDLJob(ctx, job)
}

func containsDistTaskSubJob(subJobs []*model.SubJob) bool {
for _, sub := range subJobs {
if sub.Type == model.ActionAddIndex ||
sub.Type == model.ActionAddPrimaryKey {
return true
}
}
return false
}

func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
Expand Down Expand Up @@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.TablePartitionArgs{
PartNames: partNames,
Expand Down Expand Up @@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
TableName: t.Meta().Name.L,
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
TableName: meta.Name.L,
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -3385,7 +3378,6 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
TableName: tbl.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
Expand Down Expand Up @@ -4645,11 +4637,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
OpType: model.OpAddIndex,
}

reorgMeta, err := newReorgMetaFromVariables(job, ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
Expand Down Expand Up @@ -4758,10 +4749,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddVectorIndex
indexPartSpecifications[0].Expr = nil
Expand All @@ -4788,32 +4776,20 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) {
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
}
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
if err != nil {
return nil, errors.Trace(err)
}
job.ReorgMeta = reorgMeta
return job, nil
return job
}

func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
Expand Down Expand Up @@ -4907,15 +4883,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
// global is set to 'false' is just there to be backwards compatible,
// to avoid unmarshal issues, it is now part of indexOption.
global := false
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)

job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddIndex
job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource

err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
Unique: unique,
Expand All @@ -4937,44 +4915,61 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
return errors.Trace(err)
}

func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
reorgMeta := NewDDLReorgMeta(sctx)
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
m := NewDDLReorgMeta(sctx)
setReorgParam := func() {
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
m.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.BatchSize = variable.TidbOptInt(sv, 0)
}
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
m.IsFastReorg = variable.EnableFastReorg.Load()
m.TargetScope = variable.ServiceScope.Load()
if hasSysDB(job) {
if m.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
}
m.IsDistReorg = false
m.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
}
if m.IsDistReorg && !m.IsFastReorg {
return dbterror.ErrUnsupportedDistTask
}
return nil
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
}
if hasSysDB(job) {
if reorgMeta.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
if job.Type == model.ActionAddIndex ||
job.Type == model.ActionAddPrimaryKey {
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
reorgMeta.IsDistReorg = false
reorgMeta.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
} else if job.MayNeedReorg() {
setReorgParam()
} else {
return nil
}

job.ReorgMeta = m
logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
zap.Bool("enableDistTask", m.IsDistReorg),
zap.Bool("enableFastReorg", m.IsFastReorg),
zap.String("targetScope", m.TargetScope),
zap.Int("concurrency", m.Concurrency),
zap.Int("batchSize", m.BatchSize),
)
return reorgMeta, nil
return nil
}

// LastReorgMetaFastReorgDisabled is used for test.
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,14 @@ func GetModifiableColumnJob(
TableName: t.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(sctx),
CtxVars: []any{needChangeColData},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, sctx)
if err != nil {
return nil, errors.Trace(err)
}

args := &model.ModifyColumnArgs{
Column: newCol.ColumnInfo,
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error {
Revertible: true,
CtxVars: jobW.CtxVars,
ReorgTp: reorgTp,
UseCloud: false,
})
return nil
}
Expand Down
50 changes: 9 additions & 41 deletions pkg/executor/show_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, job.Type.String()+showAddIdxReorgTp(job))
req.AppendString(3, job.Type.String())
req.AppendString(4, job.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
Expand All @@ -249,12 +249,16 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
}
req.AppendString(11, job.State.String())
if job.Type == model.ActionMultiSchemaChange {
isDistTask := job.ReorgMeta != nil && job.ReorgMeta.IsDistReorg
var useDXF, isCloud bool
if job.ReorgMeta != nil {
useDXF = job.ReorgMeta.IsDistReorg
isCloud = job.ReorgMeta.UseCloudStorage
}
for _, subJob := range job.MultiSchemaInfo.SubJobs {
req.AppendInt64(0, job.ID)
req.AppendString(1, schemaName)
req.AppendString(2, tableName)
req.AppendString(3, subJob.Type.String()+" /* subjob */"+showAddIdxReorgTpInSubJob(subJob, isDistTask))
req.AppendString(3, subJob.Type.String()+" /* subjob */")
req.AppendString(4, subJob.SchemaState.String())
req.AppendInt64(5, job.SchemaID)
req.AppendInt64(6, job.TableID)
Expand All @@ -272,46 +276,10 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che
req.AppendNull(10)
}
req.AppendString(11, subJob.State.String())
req.AppendString(12, subJob.Comments(useDXF, isCloud))
}
}
}

func showAddIdxReorgTp(job *model.Job) string {
if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey {
if job.ReorgMeta != nil {
sb := strings.Builder{}
tp := job.ReorgMeta.ReorgTp.String()
if len(tp) > 0 {
sb.WriteString(" /* ")
sb.WriteString(tp)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge &&
job.ReorgMeta.IsDistReorg &&
job.ReorgMeta.UseCloudStorage {
sb.WriteString(" cloud")
}
sb.WriteString(" */")
}
return sb.String()
}
}
return ""
}

func showAddIdxReorgTpInSubJob(subJob *model.SubJob, useDistTask bool) string {
if subJob.Type == model.ActionAddIndex || subJob.Type == model.ActionAddPrimaryKey {
sb := strings.Builder{}
tp := subJob.ReorgTp.String()
if len(tp) > 0 {
sb.WriteString(" /* ")
sb.WriteString(tp)
if subJob.ReorgTp == model.ReorgTypeLitMerge && useDistTask && subJob.UseCloud {
sb.WriteString(" cloud")
}
sb.WriteString(" */")
}
return sb.String()
}
return ""
req.AppendString(12, job.Comments())
}

func ts2Time(timestamp uint64, loc *time.Location) types.Time {
Expand Down
Loading