diff --git a/br/tests/br_partition_add_index/run.sh b/br/tests/br_partition_add_index/run.sh index 0a146cb515562..84dcc554765e5 100644 --- a/br/tests/br_partition_add_index/run.sh +++ b/br/tests/br_partition_add_index/run.sh @@ -53,7 +53,7 @@ run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR run_sql "ALTER TABLE $DB.t0 ADD INDEX idx(data);" -result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE job_type LIKE '%ingest%';") +result=$(run_sql "ADMIN SHOW DDL JOBS 1 WHERE comments LIKE '%ingest%';") run_sql "ADMIN SHOW DDL JOBS 1;" diff --git a/br/tests/br_pitr_failpoint/run.sh b/br/tests/br_pitr_failpoint/run.sh index dc6e9b463367e..8a10e74ab81fe 100644 --- a/br/tests/br_pitr_failpoint/run.sh +++ b/br/tests/br_pitr_failpoint/run.sh @@ -47,7 +47,7 @@ run_br --pd $PD_ADDR log start --task-name integration_test -s "local://$TEST_DI # wait until the index creation is running retry_cnt=0 while true; do - run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index /* ingest */';" + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'running' AND SCHEMA_STATE = 'write reorganization' AND JOB_TYPE = 'add index';" if grep -Fq "1. row" $res_file; then break fi @@ -71,7 +71,7 @@ touch $hint_sig_file_public # wait until the index creation is done retry_cnt=0 while true; do - run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';" + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'done' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';" if grep -Fq "1. row" $res_file; then break fi @@ -98,7 +98,7 @@ wait $sql_pid # wait until the index creation is done retry_cnt=0 while true; do - run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index /* ingest */';" + run_sql "ADMIN SHOW DDL JOBS WHERE DB_NAME = 'test' AND TABLE_NAME = 'pairs' AND STATE = 'synced' AND SCHEMA_STATE = 'public' AND JOB_TYPE = 'add index';" if grep -Fq "1. row" $res_file; then break fi diff --git a/pkg/ddl/column_modify_test.go b/pkg/ddl/column_modify_test.go index a70fa6def454e..5f3c945820346 100644 --- a/pkg/ddl/column_modify_test.go +++ b/pkg/ddl/column_modify_test.go @@ -622,7 +622,7 @@ func TestModifyColumnReorgCheckpoint(t *testing.T) { tk.MustExec("use test") tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test") - tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("set @@tidb_ddl_reorg_worker_cnt = 1;") tk.MustExec("create table t (a int primary key, b bigint);") rowCnt := 10 for i := 0; i < rowCnt; i++ { diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 0a3c0ceb846ad..2636f675014fc 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -2005,20 +2005,14 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info Type: model.ActionMultiSchemaChange, BinlogInfo: &model.HistoryInfo{}, MultiSchemaInfo: info, - ReorgMeta: nil, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, InvolvingSchemaInfo: involvingSchemaInfo, SQLMode: ctx.GetSessionVars().SQLMode, } - if containsDistTaskSubJob(subJobs) { - job.ReorgMeta, err = newReorgMetaFromVariables(job, ctx) - if err != nil { - return err - } - } else { - job.ReorgMeta = NewDDLReorgMeta(ctx) + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return errors.Trace(err) } - err = checkMultiSchemaInfo(info, t) if err != nil { return errors.Trace(err) @@ -2027,16 +2021,6 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info return e.DoDDLJob(ctx, job) } -func containsDistTaskSubJob(subJobs []*model.SubJob) bool { - for _, sub := range subJobs { - if sub.Type == model.ActionAddIndex || - sub.Type == model.ActionAddPrimaryKey { - return true - } - } - return false -} - func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int64, tp autoid.AllocatorType, force bool) error { schema, t, err := e.getSchemaAndTableByIdent(ident) if err != nil { @@ -2451,10 +2435,13 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden TableName: t.Meta().Name.L, Type: model.ActionAlterTablePartitioning, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: NewDDLReorgMeta(ctx), CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return err + } args := &model.TablePartitionArgs{ PartNames: partNames, @@ -2517,10 +2504,13 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, TableName: t.Meta().Name.L, Type: model.ActionReorganizePartition, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: NewDDLReorgMeta(ctx), CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return errors.Trace(err) + } args := &model.TablePartitionArgs{ PartNames: partNames, PartInfo: partInfo, @@ -2583,10 +2573,13 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s TableName: meta.Name.L, Type: model.ActionRemovePartitioning, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: NewDDLReorgMeta(ctx), CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return errors.Trace(err) + } args := &model.TablePartitionArgs{ PartNames: partNames, PartInfo: partInfo, @@ -3385,10 +3378,13 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a TableName: tbl.Meta().Name.L, Type: model.ActionModifyColumn, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: NewDDLReorgMeta(ctx), CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return err + } args := &model.ModifyColumnArgs{ Column: newCol, @@ -4645,11 +4641,10 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN OpType: model.OpAddIndex, } - reorgMeta, err := newReorgMetaFromVariables(job, ctx) + err = initJobReorgMetaFromVariables(job, ctx) if err != nil { return err } - job.ReorgMeta = reorgMeta err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) @@ -4758,10 +4753,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index return errors.Trace(err) } - job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) - if err != nil { - return errors.Trace(err) - } + job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) job.Version = model.GetJobVerInUse() job.Type = model.ActionAddVectorIndex indexPartSpecifications[0].Expr = nil @@ -4788,8 +4780,7 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index return errors.Trace(err) } -func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) (*model.Job, error) { - tzName, tzOffset := ddlutil.GetTimeZone(ctx) +func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DBInfo, t table.Table) *model.Job { charset, collate := ctx.GetSessionVars().GetCharsetInfo() job := &model.Job{ SchemaID: schema.ID, @@ -4797,23 +4788,12 @@ func buildAddIndexJobWithoutTypeAndArgs(ctx sessionctx.Context, schema *model.DB SchemaName: schema.Name.L, TableName: t.Meta().Name.L, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: &model.DDLReorgMeta{ - SQLMode: ctx.GetSessionVars().SQLMode, - Warnings: make(map[errors.ErrorID]*terror.Error), - WarningsCount: make(map[errors.ErrorID]int64), - Location: &model.TimeZoneLocation{Name: tzName, Offset: tzOffset}, - }, - Priority: ctx.GetSessionVars().DDLReorgPriority, - Charset: charset, - Collate: collate, - SQLMode: ctx.GetSessionVars().SQLMode, + Priority: ctx.GetSessionVars().DDLReorgPriority, + Charset: charset, + Collate: collate, + SQLMode: ctx.GetSessionVars().SQLMode, } - reorgMeta, err := newReorgMetaFromVariables(job, ctx) - if err != nil { - return nil, errors.Trace(err) - } - job.ReorgMeta = reorgMeta - return job, nil + return job } func (e *executor) CreateIndex(ctx sessionctx.Context, stmt *ast.CreateIndexStmt) error { @@ -4907,15 +4887,17 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast // global is set to 'false' is just there to be backwards compatible, // to avoid unmarshal issues, it is now part of indexOption. global := false - job, err := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) - if err != nil { - return errors.Trace(err) - } + job := buildAddIndexJobWithoutTypeAndArgs(ctx, schema, t) job.Version = model.GetJobVerInUse() job.Type = model.ActionAddIndex job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource + err = initJobReorgMetaFromVariables(job, ctx) + if err != nil { + return errors.Trace(err) + } + args := &model.ModifyIndexArgs{ IndexArgs: []*model.IndexArg{{ Unique: unique, @@ -4937,44 +4919,80 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast return errors.Trace(err) } -func newReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) (*model.DDLReorgMeta, error) { - reorgMeta := NewDDLReorgMeta(sctx) - reorgMeta.IsDistReorg = variable.EnableDistTask.Load() - reorgMeta.IsFastReorg = variable.EnableFastReorg.Load() - reorgMeta.TargetScope = variable.ServiceScope.Load() - if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { - reorgMeta.Concurrency = variable.TidbOptInt(sv, 0) +func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) error { + m := NewDDLReorgMeta(sctx) + setReorgParam := func() { + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgWorkerCount); ok { + m.Concurrency = variable.TidbOptInt(sv, 0) + } + if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { + m.BatchSize = variable.TidbOptInt(sv, 0) + } } - if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { - reorgMeta.BatchSize = variable.TidbOptInt(sv, 0) + setDistTaskParam := func() error { + m.IsDistReorg = variable.EnableDistTask.Load() + m.IsFastReorg = variable.EnableFastReorg.Load() + m.TargetScope = variable.ServiceScope.Load() + if hasSysDB(job) { + if m.IsDistReorg { + logutil.DDLLogger().Info("cannot use distributed task execution on system DB", + zap.Stringer("job", job)) + } + m.IsDistReorg = false + m.IsFastReorg = false + failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) { + LastReorgMetaFastReorgDisabled = true + }) + } + if m.IsDistReorg && !m.IsFastReorg { + return dbterror.ErrUnsupportedDistTask + } + return nil } - if reorgMeta.IsDistReorg && !reorgMeta.IsFastReorg { - return nil, dbterror.ErrUnsupportedDistTask - } - if hasSysDB(job) { - if reorgMeta.IsDistReorg { - logutil.DDLLogger().Info("cannot use distributed task execution on system DB", - zap.Stringer("job", job)) + switch job.Type { + case model.ActionAddIndex, model.ActionAddPrimaryKey: + setReorgParam() + err := setDistTaskParam() + if err != nil { + return err } - reorgMeta.IsDistReorg = false - reorgMeta.IsFastReorg = false - failpoint.Inject("reorgMetaRecordFastReorgDisabled", func(_ failpoint.Value) { - LastReorgMetaFastReorgDisabled = true - }) + case model.ActionReorganizePartition, + model.ActionRemovePartitioning, + model.ActionAlterTablePartitioning, + model.ActionModifyColumn: + setReorgParam() + case model.ActionMultiSchemaChange: + for _, sub := range job.MultiSchemaInfo.SubJobs { + switch sub.Type { + case model.ActionAddIndex, model.ActionAddPrimaryKey: + setReorgParam() + err := setDistTaskParam() + if err != nil { + return err + } + case model.ActionReorganizePartition, + model.ActionRemovePartitioning, + model.ActionAlterTablePartitioning, + model.ActionModifyColumn: + setReorgParam() + } + } + default: + return nil } - + job.ReorgMeta = m logutil.DDLLogger().Info("initialize reorg meta", zap.String("jobSchema", job.SchemaName), zap.String("jobTable", job.TableName), zap.Stringer("jobType", job.Type), - zap.Bool("enableDistTask", reorgMeta.IsDistReorg), - zap.Bool("enableFastReorg", reorgMeta.IsFastReorg), - zap.String("targetScope", reorgMeta.TargetScope), - zap.Int("concurrency", reorgMeta.Concurrency), - zap.Int("batchSize", reorgMeta.BatchSize), + zap.Bool("enableDistTask", m.IsDistReorg), + zap.Bool("enableFastReorg", m.IsFastReorg), + zap.String("targetScope", m.TargetScope), + zap.Int("concurrency", m.Concurrency), + zap.Int("batchSize", m.BatchSize), ) - return reorgMeta, nil + return nil } // LastReorgMetaFastReorgDisabled is used for test. diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 0a733715625d5..c58bba3d3c157 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1274,7 +1274,7 @@ func pickBackfillType(job *model.Job) (model.ReorgType, error) { func loadCloudStorageURI(w *worker, job *model.Job) { jc := w.jobContext(job.ID, job.ReorgMeta) jc.cloudStorageURI = variable.CloudStorageURI.Load() - job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 + job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 && job.ReorgMeta.IsDistReorg } func doReorgWorkForCreateIndexMultiSchema(w *worker, jobCtx *jobContext, job *model.Job, diff --git a/pkg/ddl/ingest/integration_test.go b/pkg/ddl/ingest/integration_test.go index 618d2d97799fe..8557a62936080 100644 --- a/pkg/ddl/ingest/integration_test.go +++ b/pkg/ddl/ingest/integration_test.go @@ -45,7 +45,7 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) { require.Len(t, rows, n) for i := 0; i < n; i++ { //nolint: forcetypeassert - jobTp := rows[i][3].(string) + jobTp := rows[i][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } } @@ -100,7 +100,7 @@ func TestIngestError(t *testing.T) { tk.MustExec("admin check table t;") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() //nolint: forcetypeassert - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) tk.MustExec("drop table t;") @@ -116,7 +116,7 @@ func TestIngestError(t *testing.T) { tk.MustExec("admin check table t;") rows = tk.MustQuery("admin show ddl jobs 1;").Rows() //nolint: forcetypeassert - jobTp = rows[0][3].(string) + jobTp = rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } diff --git a/pkg/ddl/modify_column.go b/pkg/ddl/modify_column.go index fadcc8bd4723b..8de8adced1733 100644 --- a/pkg/ddl/modify_column.go +++ b/pkg/ddl/modify_column.go @@ -975,11 +975,14 @@ func GetModifiableColumnJob( TableName: t.Meta().Name.L, Type: model.ActionModifyColumn, BinlogInfo: &model.HistoryInfo{}, - ReorgMeta: NewDDLReorgMeta(sctx), CtxVars: []any{needChangeColData}, CDCWriteSource: sctx.GetSessionVars().CDCWriteSource, SQLMode: sctx.GetSessionVars().SQLMode, } + err = initJobReorgMetaFromVariables(job, sctx) + if err != nil { + return nil, errors.Trace(err) + } args := &model.ModifyColumnArgs{ Column: newCol.ColumnInfo, diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index d5009c0e81b66..cbbb60ad347c2 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -194,7 +194,6 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error { Revertible: true, CtxVars: jobW.CtxVars, ReorgTp: reorgTp, - UseCloud: false, }) return nil } diff --git a/pkg/ddl/multi_schema_change_test.go b/pkg/ddl/multi_schema_change_test.go index aa61da5243c1b..b8382baa5b110 100644 --- a/pkg/ddl/multi_schema_change_test.go +++ b/pkg/ddl/multi_schema_change_test.go @@ -641,9 +641,9 @@ func TestMultiSchemaChangeAdminShowDDLJobs(t *testing.T) { assert.Equal(t, 3, len(rows)) assert.Equal(t, "test", rows[1][1]) assert.Equal(t, "t", rows[1][2]) - assert.Equal(t, "add index /* subjob */ /* txn-merge */", rows[1][3]) + assert.Equal(t, "add index /* subjob */", rows[1][3]) assert.Equal(t, "delete only", rows[1][4]) - assert.Equal(t, "running", rows[1][len(rows[1])-1]) + assert.Equal(t, "running", rows[1][len(rows[1])-2]) assert.True(t, len(rows[1][8].(string)) > 0) assert.True(t, len(rows[1][9].(string)) > 0) assert.True(t, len(rows[1][10].(string)) > 0) diff --git a/pkg/ddl/partition_test.go b/pkg/ddl/partition_test.go index 160572096db4b..d05c413b69cd9 100644 --- a/pkg/ddl/partition_test.go +++ b/pkg/ddl/partition_test.go @@ -223,7 +223,7 @@ func TestReorganizePartitionRollback(t *testing.T) { // check job rollback finished rows := tk.MustQuery("admin show ddl jobs where JOB_ID=" + jobID).Rows() require.Equal(t, 1, len(rows)) - require.Equal(t, "rollback done", rows[0][len(rows[0])-1]) + require.Equal(t, "rollback done", rows[0][len(rows[0])-2]) // check table meta after rollback tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" + diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 72d59a63e7604..20a447cbc965a 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -359,6 +359,7 @@ go_test( "select_into_test.go", "select_test.go", "set_test.go", + "show_ddl_jobs_test.go", "show_placement_labels_test.go", "show_placement_test.go", "show_stats_test.go", diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index ef1f0e4a4e28a..cf58a97612a8d 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -1620,13 +1620,7 @@ func (e *DDLJobsReaderExec) Next(_ context.Context, req *chunk.Chunk) error { if e.cursor < len(e.runningJobs) { num := min(req.Capacity(), len(e.runningJobs)-e.cursor) for i := e.cursor; i < e.cursor+num; i++ { - e.appendJobToChunk(req, e.runningJobs[i], checker) - req.AppendString(12, e.runningJobs[i].Query) - if e.runningJobs[i].MultiSchemaInfo != nil { - for range e.runningJobs[i].MultiSchemaInfo.SubJobs { - req.AppendString(12, e.runningJobs[i].Query) - } - } + e.appendJobToChunk(req, e.runningJobs[i], checker, false) } e.cursor += num count += num @@ -1640,13 +1634,7 @@ func (e *DDLJobsReaderExec) Next(_ context.Context, req *chunk.Chunk) error { return err } for _, job := range e.cacheJobs { - e.appendJobToChunk(req, job, checker) - req.AppendString(12, job.Query) - if job.MultiSchemaInfo != nil { - for range job.MultiSchemaInfo.SubJobs { - req.AppendString(12, job.Query) - } - } + e.appendJobToChunk(req, job, checker, false) } e.cursor += len(e.cacheJobs) } diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 9f6dd66a35b48..954bb8f58255a 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -866,21 +866,21 @@ func TestInfoSchemaDDLJobs(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE table_name = "t1";`).Check(testkit.RowsWithSep("|", - "131|add index /* txn-merge */|public|124|129|t1|synced", + "131|add index|public|124|129|t1|synced", "130|create table|public|124|129|t1|synced", - "117|add index /* txn-merge */|public|110|115|t1|synced", + "117|add index|public|110|115|t1|synced", "116|create table|public|110|115|t1|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d1" and JOB_TYPE LIKE "add index%%";`).Check(testkit.RowsWithSep("|", - "137|add index /* txn-merge */|public|124|135|t3|synced", - "134|add index /* txn-merge */|public|124|132|t2|synced", - "131|add index /* txn-merge */|public|124|129|t1|synced", - "128|add index /* txn-merge */|public|124|126|t0|synced", + "137|add index|public|124|135|t3|synced", + "134|add index|public|124|132|t2|synced", + "131|add index|public|124|129|t1|synced", + "128|add index|public|124|126|t0|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d0" and table_name = "t3";`).Check(testkit.RowsWithSep("|", - "123|add index /* txn-merge */|public|110|121|t3|synced", + "123|add index|public|110|121|t3|synced", "122|create table|public|110|121|t3|synced", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE @@ -892,15 +892,15 @@ func TestInfoSchemaDDLJobs(t *testing.T) { if job.SchemaState == model.StateWriteOnly && loaded.CompareAndSwap(false, true) { tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE table_name = "t0" and state = "running";`).Check(testkit.RowsWithSep("|", - "138 add index /* txn-merge */ write only 110 112 t0 running", + "138 add index write only 110 112 t0 running", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE db_name = "d0" and state = "running";`).Check(testkit.RowsWithSep("|", - "138 add index /* txn-merge */ write only 110 112 t0 running", + "138 add index write only 110 112 t0 running", )) tk2.MustQuery(`SELECT JOB_ID, JOB_TYPE, SCHEMA_STATE, SCHEMA_ID, TABLE_ID, table_name, STATE FROM information_schema.ddl_jobs WHERE state = "running";`).Check(testkit.RowsWithSep("|", - "138 add index /* txn-merge */ write only 110 112 t0 running", + "138 add index write only 110 112 t0 running", )) } }) diff --git a/pkg/executor/show_ddl_jobs.go b/pkg/executor/show_ddl_jobs.go index 9d07679a59877..4761577cfe215 100644 --- a/pkg/executor/show_ddl_jobs.go +++ b/pkg/executor/show_ddl_jobs.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/chunk" @@ -90,7 +91,7 @@ func (e *ShowDDLJobsExec) Next(_ context.Context, req *chunk.Chunk) error { if e.cursor < len(e.runningJobs) { numCurBatch := min(req.Capacity(), len(e.runningJobs)-e.cursor) for i := e.cursor; i < e.cursor+numCurBatch; i++ { - e.appendJobToChunk(req, e.runningJobs[i], nil) + e.appendJobToChunk(req, e.runningJobs[i], nil, true) } e.cursor += numCurBatch count += numCurBatch @@ -107,7 +108,7 @@ func (e *ShowDDLJobsExec) Next(_ context.Context, req *chunk.Chunk) error { return err } for _, job := range e.cacheJobs { - e.appendJobToChunk(req, job, nil) + e.appendJobToChunk(req, job, nil, true) } e.cursor += len(e.cacheJobs) } @@ -185,7 +186,7 @@ func (e *DDLJobRetriever) initial(txn kv.Transaction, sess sessionctx.Context) e return nil } -func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, checker privilege.Manager) { +func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, checker privilege.Manager, inShowStmt bool) { schemaName := job.SchemaName tableName := "" finishTS := uint64(0) @@ -231,7 +232,7 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che req.AppendInt64(0, job.ID) req.AppendString(1, schemaName) req.AppendString(2, tableName) - req.AppendString(3, job.Type.String()+showAddIdxReorgTp(job)) + req.AppendString(3, job.Type.String()) req.AppendString(4, job.SchemaState.String()) req.AppendInt64(5, job.SchemaID) req.AppendInt64(6, job.TableID) @@ -249,12 +250,16 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che } req.AppendString(11, job.State.String()) if job.Type == model.ActionMultiSchemaChange { - isDistTask := job.ReorgMeta != nil && job.ReorgMeta.IsDistReorg + var useDXF, isCloud bool + if job.ReorgMeta != nil { + useDXF = job.ReorgMeta.IsDistReorg + isCloud = job.ReorgMeta.UseCloudStorage + } for _, subJob := range job.MultiSchemaInfo.SubJobs { req.AppendInt64(0, job.ID) req.AppendString(1, schemaName) req.AppendString(2, tableName) - req.AppendString(3, subJob.Type.String()+" /* subjob */"+showAddIdxReorgTpInSubJob(subJob, isDistTask)) + req.AppendString(3, subJob.Type.String()+" /* subjob */") req.AppendString(4, subJob.SchemaState.String()) req.AppendInt64(5, job.SchemaID) req.AppendInt64(6, job.TableID) @@ -272,46 +277,70 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che req.AppendNull(10) } req.AppendString(11, subJob.State.String()) + if inShowStmt { + req.AppendString(12, showCommentsFromSubjob(subJob, useDXF, isCloud)) + } else { + req.AppendString(12, job.Query) + } } } + if inShowStmt { + req.AppendString(12, showCommentsFromJob(job)) + } else { + req.AppendString(12, job.Query) + } } -func showAddIdxReorgTp(job *model.Job) string { - if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey { - if job.ReorgMeta != nil { - sb := strings.Builder{} - tp := job.ReorgMeta.ReorgTp.String() - if len(tp) > 0 { - sb.WriteString(" /* ") - sb.WriteString(tp) - if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge && - job.ReorgMeta.IsDistReorg && - job.ReorgMeta.UseCloudStorage { - sb.WriteString(" cloud") - } - sb.WriteString(" */") +func showCommentsFromJob(job *model.Job) string { + m := job.ReorgMeta + if m == nil { + return "" + } + var labels []string + if job.Type == model.ActionAddIndex || + job.Type == model.ActionAddPrimaryKey { + switch m.ReorgTp { + case model.ReorgTypeTxn: + labels = append(labels, model.ReorgTypeTxn.String()) + case model.ReorgTypeLitMerge: + labels = append(labels, model.ReorgTypeLitMerge.String()) + if m.IsDistReorg { + labels = append(labels, "DXF") } - return sb.String() + if m.UseCloudStorage { + labels = append(labels, "cloud") + } + case model.ReorgTypeTxnMerge: + labels = append(labels, model.ReorgTypeTxnMerge.String()) + } + } + if job.MayNeedReorg() { + if m.Concurrency != 0 && m.Concurrency != variable.DefTiDBDDLReorgWorkerCount { + labels = append(labels, fmt.Sprintf("thread=%d", m.Concurrency)) + } + if m.BatchSize != 0 && m.BatchSize != variable.DefTiDBDDLReorgBatchSize { + labels = append(labels, fmt.Sprintf("batch_size=%d", m.BatchSize)) + } + if m.TargetScope != "" { + labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope)) } } - return "" + return strings.Join(labels, ", ") } -func showAddIdxReorgTpInSubJob(subJob *model.SubJob, useDistTask bool) string { - if subJob.Type == model.ActionAddIndex || subJob.Type == model.ActionAddPrimaryKey { - sb := strings.Builder{} - tp := subJob.ReorgTp.String() - if len(tp) > 0 { - sb.WriteString(" /* ") - sb.WriteString(tp) - if subJob.ReorgTp == model.ReorgTypeLitMerge && useDistTask && subJob.UseCloud { - sb.WriteString(" cloud") - } - sb.WriteString(" */") - } - return sb.String() +func showCommentsFromSubjob(sub *model.SubJob, useDXF, useCloud bool) string { + var labels []string + if sub.ReorgTp == model.ReorgTypeNone { + return "" + } + labels = append(labels, sub.ReorgTp.String()) + if useDXF { + labels = append(labels, "DXF") + } + if useDXF && useCloud { + labels = append(labels, "cloud") } - return "" + return strings.Join(labels, ", ") } func ts2Time(timestamp uint64, loc *time.Location) types.Time { diff --git a/pkg/executor/show_ddl_jobs_test.go b/pkg/executor/show_ddl_jobs_test.go new file mode 100644 index 0000000000000..5b9aabd3324fb --- /dev/null +++ b/pkg/executor/show_ddl_jobs_test.go @@ -0,0 +1,119 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/stretchr/testify/require" +) + +func TestShowCommentsFromJob(t *testing.T) { + job := &model.Job{} + job.Type = model.ActionAddCheckConstraint + res := showCommentsFromJob(job) + require.Equal(t, "", res) // No reorg meta + + job.Type = model.ActionAddIndex + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeTxn, + } + res = showCommentsFromJob(job) + require.Equal(t, "txn", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeTxn, + IsDistReorg: true, + } + res = showCommentsFromJob(job) + require.Equal(t, "txn", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeTxnMerge, + IsDistReorg: true, + } + res = showCommentsFromJob(job) + require.Equal(t, "txn-merge", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeLitMerge, + IsDistReorg: true, + } + res = showCommentsFromJob(job) + require.Equal(t, "ingest, DXF", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeLitMerge, + IsDistReorg: true, + UseCloudStorage: true, + } + res = showCommentsFromJob(job) + require.Equal(t, "ingest, DXF, cloud", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeLitMerge, + IsDistReorg: true, + UseCloudStorage: true, + Concurrency: 8, + BatchSize: 1024, + } + res = showCommentsFromJob(job) + require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeLitMerge, + IsDistReorg: true, + UseCloudStorage: true, + Concurrency: variable.DefTiDBDDLReorgWorkerCount, + BatchSize: variable.DefTiDBDDLReorgBatchSize, + } + res = showCommentsFromJob(job) + require.Equal(t, "ingest, DXF, cloud", res) + + job.ReorgMeta = &model.DDLReorgMeta{ + ReorgTp: model.ReorgTypeLitMerge, + IsDistReorg: true, + UseCloudStorage: true, + Concurrency: variable.DefTiDBDDLReorgWorkerCount, + BatchSize: variable.DefTiDBDDLReorgBatchSize, + TargetScope: "background", + } + res = showCommentsFromJob(job) + require.Equal(t, "ingest, DXF, cloud, service_scope=background", res) +} + +func TestShowCommentsFromSubJob(t *testing.T) { + subJob := &model.SubJob{ + Type: model.ActionAddPrimaryKey, + } + subJob.ReorgTp = model.ReorgTypeNone + res := showCommentsFromSubjob(subJob, false, false) + require.Equal(t, "", res) + + subJob.ReorgTp = model.ReorgTypeLitMerge + res = showCommentsFromSubjob(subJob, false, false) + require.Equal(t, "ingest", res) + + res = showCommentsFromSubjob(subJob, true, false) + require.Equal(t, "ingest, DXF", res) + + res = showCommentsFromSubjob(subJob, true, true) + require.Equal(t, "ingest, DXF, cloud", res) + + res = showCommentsFromSubjob(subJob, false, true) + require.Equal(t, "ingest", res) +} diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index b9efca75dd47d..e0901c3ffaef9 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -2441,7 +2441,7 @@ func TestAdmin(t *testing.T) { err = r.Next(ctx, req) require.NoError(t, err) row = req.GetRow(0) - require.Equal(t, 12, row.Len()) + require.Equal(t, 13, row.Len()) txn, err := store.Begin() require.NoError(t, err) historyJobs, err := ddl.GetLastNHistoryDDLJobs(meta.NewMutator(txn), ddl.DefNumHistoryJobs) @@ -2457,7 +2457,7 @@ func TestAdmin(t *testing.T) { err = r.Next(ctx, req) require.NoError(t, err) row = req.GetRow(0) - require.Equal(t, 12, row.Len()) + require.Equal(t, 13, row.Len()) require.Equal(t, historyJobs[0].ID, row.GetInt64(0)) require.NoError(t, err) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 96d75970b1bed..a2dc4d4d3e35a 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -884,7 +884,6 @@ type SubJob struct { CtxVars []any `json:"-"` SchemaVer int64 `json:"schema_version"` ReorgTp ReorgType `json:"reorg_tp"` - UseCloud bool `json:"use_cloud"` } // IsNormal returns true if the sub-job is normally running. @@ -952,8 +951,9 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { sub.Warning = proxyJob.Warning sub.RowCount = proxyJob.RowCount sub.SchemaVer = ver - sub.ReorgTp = proxyJob.ReorgMeta.ReorgTp - sub.UseCloud = proxyJob.ReorgMeta.UseCloudStorage + if proxyJob.ReorgMeta != nil { + sub.ReorgTp = proxyJob.ReorgMeta.ReorgTp + } } // FillArgs fills args. diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index ad4c1455a5a3a..24b2604453542 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3163,6 +3163,7 @@ func buildShowDDLJobsFields() (*expression.Schema, types.NameSlice) { schema.Append(buildColumnWithName("", "START_TIME", mysql.TypeDatetime, 19)) schema.Append(buildColumnWithName("", "END_TIME", mysql.TypeDatetime, 19)) schema.Append(buildColumnWithName("", "STATE", mysql.TypeVarchar, 64)) + schema.Append(buildColumnWithName("", "COMMENTS", mysql.TypeVarchar, 65535)) return schema.col2Schema(), schema.names } diff --git a/tests/integrationtest/r/executor/executor.result b/tests/integrationtest/r/executor/executor.result index 5a61cfa41c802..f97e6126bd0a7 100644 --- a/tests/integrationtest/r/executor/executor.result +++ b/tests/integrationtest/r/executor/executor.result @@ -4324,13 +4324,13 @@ TOTAL_SPLIT_REGION SCATTER_FINISH_RATIO insert into t values (0,0),(10,10),(20,20),(30,30); alter table t add index idx1(b); admin show ddl jobs 1; -JOB_ID DB_NAME TABLE_NAME JOB_TYPE SCHEMA_STATE SCHEMA_ID TABLE_ID ROW_COUNT CREATE_TIME START_TIME END_TIME STATE - executor__executor t public 4 synced +JOB_ID DB_NAME TABLE_NAME JOB_TYPE SCHEMA_STATE SCHEMA_ID TABLE_ID ROW_COUNT CREATE_TIME START_TIME END_TIME STATE COMMENTS + executor__executor t public 4 synced insert into t values (1,0),(2,10),(3,20),(4,30); alter table t add index idx2(b); admin show ddl jobs 1; -JOB_ID DB_NAME TABLE_NAME JOB_TYPE SCHEMA_STATE SCHEMA_ID TABLE_ID ROW_COUNT CREATE_TIME START_TIME END_TIME STATE - executor__executor t public 8 synced +JOB_ID DB_NAME TABLE_NAME JOB_TYPE SCHEMA_STATE SCHEMA_ID TABLE_ID ROW_COUNT CREATE_TIME START_TIME END_TIME STATE COMMENTS + executor__executor t public 8 synced drop table if exists t; create table t(a int, b int as(-a)); insert into t(a) values(1), (3), (7); diff --git a/tests/integrationtest/r/executor/infoschema_reader.result b/tests/integrationtest/r/executor/infoschema_reader.result index 9da5cdee9d0da..7db3c8c4ce6b8 100644 --- a/tests/integrationtest/r/executor/infoschema_reader.result +++ b/tests/integrationtest/r/executor/infoschema_reader.result @@ -172,7 +172,7 @@ db_name table_name job_type test_ddl_jobs tt alter table multi-schema change test_ddl_jobs tt add column /* subjob */ test_ddl_jobs tt add column /* subjob */ -test_ddl_jobs tt add index /* subjob */ /* txn */ +test_ddl_jobs tt add index /* subjob */ drop database test_ddl_jobs; use executor__infoschema_reader; set global tidb_ddl_enable_fast_reorg = default; diff --git a/tests/integrationtest/t/executor/executor.test b/tests/integrationtest/t/executor/executor.test index fae9c4c4b9b2a..7e240a2653c70 100644 --- a/tests/integrationtest/t/executor/executor.test +++ b/tests/integrationtest/t/executor/executor.test @@ -2623,12 +2623,12 @@ create table t (id bigint key,b int); split table t by (10),(20),(30); insert into t values (0,0),(10,10),(20,20),(30,30); alter table t add index idx1(b); ---replace_column 1 4 6 7 9 10 11 +--replace_column 1 4 6 7 9 10 11 13 admin show ddl jobs 1; insert into t values (1,0),(2,10),(3,20),(4,30); alter table t add index idx2(b); ---replace_column 1 4 6 7 9 10 11 +--replace_column 1 4 6 7 9 10 11 13 admin show ddl jobs 1; # TestSummaryFailedUpdate diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index c8ba7fa79dad0..42659b37f93f5 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -240,9 +240,9 @@ func TestAddIndexIngestShowReorgTp(t *testing.T) { rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobType, rowCnt := rows[0][3].(string), rows[0][7].(string) - require.True(t, strings.Contains(jobType, "ingest")) - require.False(t, strings.Contains(jobType, "cloud")) + jobType, rowCnt := rows[0][12].(string), rows[0][7].(string) + require.True(t, strings.Contains(jobType, "ingest"), jobType) + require.False(t, strings.Contains(jobType, "cloud"), jobType) require.Equal(t, rowCnt, "3") } diff --git a/tests/realtikvtest/addindextest3/ingest_test.go b/tests/realtikvtest/addindextest3/ingest_test.go index 0f9967e263a7a..d451a2e2ee213 100644 --- a/tests/realtikvtest/addindextest3/ingest_test.go +++ b/tests/realtikvtest/addindextest3/ingest_test.go @@ -104,8 +104,8 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { wg.Wait() rows := tk.MustQuery("admin show ddl jobs 2;").Rows() require.Len(t, rows, 2) - require.True(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest")) - require.True(t, strings.Contains(rows[1][3].(string) /* job_type */, "ingest")) + require.True(t, strings.Contains(rows[0][12].(string) /* comments */, "ingest")) + require.True(t, strings.Contains(rows[1][12].(string) /* comments */, "ingest")) require.Equal(t, rows[0][7].(string) /* row_count */, "3") require.Equal(t, rows[1][7].(string) /* row_count */, "3") @@ -160,7 +160,7 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { tk.MustExec("alter table t add index idx(a);") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } @@ -204,7 +204,7 @@ func TestIngestMVIndexOnPartitionTable(t *testing.T) { tk.MustExec(c) rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) addIndexDone.Store(true) wg.Wait() @@ -260,7 +260,7 @@ func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { tk.MustExec("admin check table t;") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } @@ -290,7 +290,7 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) { tk.MustExec("alter table t add index idx(a);") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) ingest.ImporterRangeConcurrencyForTest = nil } @@ -307,7 +307,7 @@ func TestAddIndexIngestEmptyTable(t *testing.T) { rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } @@ -335,7 +335,7 @@ func TestAddIndexIngestRestoredData(t *testing.T) { tk.MustExec("admin check table tbl_5;") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() require.Len(t, rows, 1) - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } @@ -444,7 +444,7 @@ func TestAddIndexMockFlushError(t *testing.T) { tk.MustExec("admin check table t;") rows := tk.MustQuery("admin show ddl jobs 1;").Rows() //nolint: forcetypeassert - jobTp := rows[0][3].(string) + jobTp := rows[0][12].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) }