Skip to content

Commit

Permalink
ddl: add COMMENTS column to DDL jobs and enhance job reorg meta handl…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored and ti-chi-bot committed Nov 21, 2024
1 parent 9862e83 commit b751b95
Show file tree
Hide file tree
Showing 23 changed files with 335 additions and 177 deletions.
2 changes: 1 addition & 1 deletion br/tests/br_partition_add_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

run_sql "ALTER TABLE $DB.t0 ADD INDEX idx(data);"

result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE job_type LIKE '%ingest%';")
result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE comments LIKE '%ingest%';")

run_sql "ADMIN SHOW DDL JOBS 1;"

Expand Down
6 changes: 3 additions & 3 deletions br/tests/br_pitr_failpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DI
# wait until the index creation is running
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand All @@ -71,7 +71,7 @@ touch $hint_sig_file_public
# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand All @@ -98,7 +98,7 @@ wait $sql_pid
# wait until the index creation is done
retry_cnt=0
while true; do
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';"
run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';"
if grep -Fq "1. row" $res_file; then
break
fi
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestModifyColumnReorgCheckpoint(t *testing.T) {
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
Expand Down
176 changes: 97 additions & 79 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
Type: model.ActionMultiSchemaChange,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: info,
ReorgMeta: nil,
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}
if containsDistTaskSubJob(subJobs) {
job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
} else {
job.ReorgMeta = NewDDLReorgMeta(ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

err = checkMultiSchemaInfo(info, t)
if err != nil {
return errors.Trace(err)
Expand All @@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
return e.DoDDLJob(ctx, job)
}

func containsDistTaskSubJob(subJobs []*model.SubJob) bool {
for _, sub := range subJobs {
if sub.Type == model.ActionAddIndex ||
sub.Type == model.ActionAddPrimaryKey {
return true
}
}
return false
}

func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error {
schema, t, err := e.getSchemaAndTableByIdent(ident)
if err != nil {
Expand Down Expand Up @@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden
TableName: t.Meta().Name.L,
Type: model.ActionAlterTablePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.TablePartitionArgs{
PartNames: partNames,
Expand Down Expand Up @@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident,
TableName: t.Meta().Name.L,
Type: model.ActionReorganizePartition,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s
TableName: meta.Name.L,
Type: model.ActionRemovePartitioning,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}
args := &model.TablePartitionArgs{
PartNames: partNames,
PartInfo: partInfo,
Expand Down Expand Up @@ -3385,10 +3378,13 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
TableName: tbl.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(ctx),
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}

args := &model.ModifyColumnArgs{
Column: newCol,
Expand Down Expand Up @@ -4645,11 +4641,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
OpType: model.OpAddIndex,
}

reorgMeta, err := newReorgMetaFromVariables(job, ctx)
err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return err
}
job.ReorgMeta = reorgMeta

err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
Expand Down Expand Up @@ -4758,10 +4753,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddVectorIndex
indexPartSpecifications[0].Expr = nil
Expand All @@ -4788,32 +4780,20 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index
return errors.Trace(err)
}

func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) {
tzName, tzOffset := ddlutil.GetTimeZone(ctx)
func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job {
charset, collate := ctx.GetSessionVars().GetCharsetInfo()
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: &model.DDLReorgMeta{
SQLMode: ctx.GetSessionVars().SQLMode,
Warnings: make(map[errors.ErrorID]*terror.Error),
WarningsCount: make(map[errors.ErrorID]int64),
Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset},
},
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
Priority: ctx.GetSessionVars().DDLReorgPriority,
Charset: charset,
Collate: collate,
SQLMode: ctx.GetSessionVars().SQLMode,
}
reorgMeta, err := newReorgMetaFromVariables(job, ctx)
if err != nil {
return nil, errors.Trace(err)
}
job.ReorgMeta = reorgMeta
return job, nil
return job
}

func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error {
Expand Down Expand Up @@ -4907,15 +4887,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
// global is set to 'false' is just there to be backwards compatible,
// to avoid unmarshal issues, it is now part of indexOption.
global := false
job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)
if err != nil {
return errors.Trace(err)
}
job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t)

job.Version = model.GetJobVerInUse()
job.Type = model.ActionAddIndex
job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource

err = initJobReorgMetaFromVariables(job, ctx)
if err != nil {
return errors.Trace(err)
}

args := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
Unique: unique,
Expand All @@ -4937,44 +4919,80 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast
return errors.Trace(err)
}

func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) {
reorgMeta := NewDDLReorgMeta(sctx)
reorgMeta.IsDistReorg = variable.EnableDistTask.Load()
reorgMeta.IsFastReorg = variable.EnableFastReorg.Load()
reorgMeta.TargetScope = variable.ServiceScope.Load()
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
reorgMeta.Concurrency = variable.TidbOptInt(sv, 0)
func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error {
m := NewDDLReorgMeta(sctx)
setReorgParam := func() {
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok {
m.Concurrency = variable.TidbOptInt(sv, 0)
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.BatchSize = variable.TidbOptInt(sv, 0)
}
}
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
reorgMeta.BatchSize = variable.TidbOptInt(sv, 0)
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
m.IsFastReorg = variable.EnableFastReorg.Load()
m.TargetScope = variable.ServiceScope.Load()
if hasSysDB(job) {
if m.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
}
m.IsDistReorg = false
m.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
}
if m.IsDistReorg && !m.IsFastReorg {
return dbterror.ErrUnsupportedDistTask
}
return nil
}

if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg {
return nil, dbterror.ErrUnsupportedDistTask
}
if hasSysDB(job) {
if reorgMeta.IsDistReorg {
logutil.DDLLogger().Info("cannot use distributed task execution on system DB",
zap.Stringer("job", job))
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
reorgMeta.IsDistReorg = false
reorgMeta.IsFastReorg = false
failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) {
LastReorgMetaFastReorgDisabled = true
})
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
switch sub.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
setReorgParam()
err := setDistTaskParam()
if err != nil {
return err
}
case model.ActionReorganizePartition,
model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning,
model.ActionModifyColumn:
setReorgParam()
}
}
default:
return nil
}

job.ReorgMeta = m
logutil.DDLLogger().Info("initialize reorg meta",
zap.String("jobSchema", job.SchemaName),
zap.String("jobTable", job.TableName),
zap.Stringer("jobType", job.Type),
zap.Bool("enableDistTask", reorgMeta.IsDistReorg),
zap.Bool("enableFastReorg", reorgMeta.IsFastReorg),
zap.String("targetScope", reorgMeta.TargetScope),
zap.Int("concurrency", reorgMeta.Concurrency),
zap.Int("batchSize", reorgMeta.BatchSize),
zap.Bool("enableDistTask", m.IsDistReorg),
zap.Bool("enableFastReorg", m.IsFastReorg),
zap.String("targetScope", m.TargetScope),
zap.Int("concurrency", m.Concurrency),
zap.Int("batchSize", m.BatchSize),
)
return reorgMeta, nil
return nil
}

// LastReorgMetaFastReorgDisabled is used for test.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) {
func loadCloudStorageURI(w *worker, job *model.Job) {
jc := w.jobContext(job.ID, job.ReorgMeta)
jc.cloudStorageURI = variable.CloudStorageURI.Load()
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0
job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 && job.ReorgMeta.IsDistReorg
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, jobCtx *jobContext, job *model.Job,
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ingest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
require.Len(t, rows, n)
for i := 0; i < n; i++ {
//nolint: forcetypeassert
jobTp := rows[i][3].(string)
jobTp := rows[i][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp := rows[0][3].(string)
jobTp := rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)

tk.MustExec("drop table t;")
Expand All @@ -116,7 +116,7 @@ func TestIngestError(t *testing.T) {
tk.MustExec("admin check table t;")
rows = tk.MustQuery("admin show ddl jobs 1;").Rows()
//nolint: forcetypeassert
jobTp = rows[0][3].(string)
jobTp = rows[0][12].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,14 @@ func GetModifiableColumnJob(
TableName: t.Meta().Name.L,
Type: model.ActionModifyColumn,
BinlogInfo: &model.HistoryInfo{},
ReorgMeta: NewDDLReorgMeta(sctx),
CtxVars: []any{needChangeColData},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}
err = initJobReorgMetaFromVariables(job, sctx)
if err != nil {
return nil, errors.Trace(err)
}

args := &model.ModifyColumnArgs{
Column: newCol.ColumnInfo,
Expand Down
Loading

0 comments on commit b751b95

Please sign in to comment.