Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support modify the related reorg config by SQL #57336

Merged
merged 25 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1124,3 +1125,124 @@ func TestDDLJobErrEntrySizeTooLarge(t *testing.T) {
tk.MustExec("create table t1 (a int);")
tk.MustExec("alter table t add column b int;") // Should not block.
}

func insertMockJob2Table(tk *testkit.TestKit, job *model.Job) {
b, err := job.Encode(false)
tk.RequireNoError(err)
sql := fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, job_meta) values(%s, ?);",
strconv.FormatInt(job.ID, 10))
tk.MustExec(sql, b)
}

func getJobMetaByID(t *testing.T, tk *testkit.TestKit, jobID int64) *model.Job {
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_job where job_id = %s",
strconv.FormatInt(jobID, 10))
rows := tk.MustQuery(sql)
res := rows.Rows()
require.Len(t, res, 1)
require.Len(t, res[0], 1)
jobBinary := []byte(res[0][0].(string))
job := model.Job{}
err := job.Decode(jobBinary)
require.NoError(t, err)
return &job
}

func deleteJobMetaByID(tk *testkit.TestKit, jobID int64) {
sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %s",
strconv.FormatInt(jobID, 10))
tk.MustExec(sql)
}

func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
}
insertMockJob2Table(tk, &job)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
j := getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.Concurrency, 8)

tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.BatchSize, 256)

tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.Concurrency, 16)
require.Equal(t, j.ReorgMeta.BatchSize, 512)
deleteJobMetaByID(tk, job.ID)
}

func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")

// invalid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")

// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")

job := model.Job{
ID: 1,
Type: model.ActionAddColumn,
}
insertMockJob2Table(tk, &job)
// unsupported job type
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
"unsupported DDL operation: add column, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
deleteJobMetaByID(tk, 1)

job = model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
IsDistReorg: true,
},
}
insertMockJob2Table(tk, &job)
// unsupported job type
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
"unsupported DDL operation: add index, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
deleteJobMetaByID(tk, 1)
}

func TestAdminAlterDDLJobCommitFailed(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed", `return(true)`)
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
}
insertMockJob2Table(tk, &job)
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID),
"mock commit failed on admin alter ddl jobs")
j := getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta, job.ReorgMeta)
deleteJobMetaByID(tk, job.ID)
}
2 changes: 2 additions & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ go_library(
"//pkg/ddl/label",
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/ddl/session",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/disttask/framework/handle",
Expand Down
11 changes: 11 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor {
return b.buildPauseDDLJobs(v)
case *plannercore.ResumeDDLJobs:
return b.buildResumeDDLJobs(v)
case *plannercore.AlterDDLJob:
return b.buildAlterDDLJob(v)
case *plannercore.ShowNextRowID:
return b.buildShowNextRowID(v)
case *plannercore.ShowDDL:
Expand Down Expand Up @@ -359,6 +361,15 @@ func (b *executorBuilder) buildResumeDDLJobs(v *plannercore.ResumeDDLJobs) exec.
return e
}

func (b *executorBuilder) buildAlterDDLJob(v *plannercore.AlterDDLJob) exec.Executor {
e := &AlterDDLJobExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
jobID: v.JobID,
AlterOpts: v.Options,
}
return e
}

func (b *executorBuilder) buildShowNextRowID(v *plannercore.ShowNextRowID) exec.Executor {
e := &ShowNextRowIDExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
Expand Down
131 changes: 131 additions & 0 deletions pkg/executor/operate_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ import (
"fmt"
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
)
Expand Down Expand Up @@ -85,3 +93,126 @@ type PauseDDLJobsExec struct {
type ResumeDDLJobsExec struct {
*CommandDDLJobsExec
}

// AlterDDLJobExec indicates an Executor for alter config of a DDL Job.
type AlterDDLJobExec struct {
exec.BaseExecutor
jobID int64
AlterOpts []*core.AlterDDLJobOpt
}

// Open implements the Executor Open interface.
func (e *AlterDDLJobExec) Open(ctx context.Context) error {
newSess, err := e.GetSysSession()
if err != nil {
return err
}
defer e.ReleaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess)

return e.processAlterDDLJobConfig(ctx, newSess)
}

func getJobMetaFromTable(
ctx context.Context,
se *sess.Session,
jobID int64,
) (*model.Job, error) {
sql := fmt.Sprintf("select job_meta from mysql.%s where job_id = %s",
ddl.JobTable, strconv.FormatInt(jobID, 10))
rows, err := se.Execute(ctx, sql, "get_job_by_id")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, fmt.Errorf("ddl job %d is not running", jobID)
}
jobBinary := rows[0].GetBytes(0)
job := model.Job{}
err = job.Decode(jobBinary)
if err != nil {
return nil, errors.Trace(err)
}
return &job, nil
}

func updateJobMeta2Table(
ctx context.Context,
se *sess.Session,
job *model.Job,
) error {
b, err := job.Encode(false)
if err != nil {
return err
}
sql := fmt.Sprintf("update mysql.%s set job_meta = %s where job_id = %d",
ddl.JobTable, util.WrapKey2String(b), job.ID)
_, err = se.Execute(ctx, sql, "update_job")
return errors.Trace(err)
}

const alterDDLJobMaxRetryCnt = 3

// processAlterDDLJobConfig try to alter the ddl job configs.
// In case of failure, it will retry alterDDLJobMaxRetryCnt times.
func (e *AlterDDLJobExec) processAlterDDLJobConfig(
ctx context.Context,
sessCtx sessionctx.Context,
) (err error) {
ns := sess.NewSession(sessCtx)
var job *model.Job
for tryN := uint(0); tryN < alterDDLJobMaxRetryCnt; tryN++ {
if err = ns.Begin(ctx); err != nil {
continue
}
job, err = getJobMetaFromTable(ctx, ns, e.jobID)
if err != nil {
continue
}
if !job.IsAlterable() {
return fmt.Errorf("unsupported DDL operation: %s, "+
"only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job", job.Type.String())
}
if err = e.updateReorgMeta(job, model.AdminCommandByEndUser); err != nil {
continue
}
if err = updateJobMeta2Table(ctx, ns, job); err != nil {
continue
}

failpoint.Inject("mockAlterDDLJobCommitFailed", func(val failpoint.Value) {
if val.(bool) {
ns.Rollback()
failpoint.Return(errors.New("mock commit failed on admin alter ddl jobs"))
}
})

if err = ns.Commit(ctx); err != nil {
ns.Rollback()
continue
}
return nil
}
return err
}

func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error {
for _, opt := range e.AlterOpts {
switch opt.Name {
case core.AlterDDLJobThread:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.Concurrency = int(cons.Value.GetInt64())
}
job.AdminOperator = byWho
case core.AlterDDLJobBatchSize:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.BatchSize = int(cons.Value.GetInt64())
}
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
}
}
return nil
}
8 changes: 8 additions & 0 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,14 @@ func (job *Job) IsPausable() bool {
return job.NotStarted() || (job.IsRunning() && job.IsRollbackable())
}

// IsAlterable checks whether the job type can be altered.
func (job *Job) IsAlterable() bool {
// Currently, only non-distributed add index reorg task can be altered
return job.Type == ActionAddIndex && !job.ReorgMeta.IsDistReorg ||
job.Type == ActionModifyColumn ||
job.Type == ActionReorganizePartition
}

// IsResumable checks whether the job can be rollback.
func (job *Job) IsResumable() bool {
return job.IsPaused()
Expand Down
29 changes: 29 additions & 0 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,35 @@ type ResumeDDLJobs struct {
JobIDs []int64
}

const (
// AlterDDLJobThread alter reorg worker count
AlterDDLJobThread = "thread"
// AlterDDLJobBatchSize alter reorg batch size
AlterDDLJobBatchSize = "batch_size"
// AlterDDLJobMaxWriteSpeed alter reorg max write speed
AlterDDLJobMaxWriteSpeed = "max_write_speed"
)

var allowedAlterDDLJobParams = map[string]struct{}{
AlterDDLJobThread: {},
AlterDDLJobBatchSize: {},
AlterDDLJobMaxWriteSpeed: {},
}

// AlterDDLJobOpt represents alter ddl job option.
type AlterDDLJobOpt struct {
Name string
Value expression.Expression
}

// AlterDDLJob is the plan of admin alter ddl job
type AlterDDLJob struct {
baseSchemaProducer

JobID int64
Options []*AlterDDLJobOpt
}

// ReloadExprPushdownBlacklist reloads the data from expr_pushdown_blacklist table.
type ReloadExprPushdownBlacklist struct {
baseSchemaProducer
Expand Down
Loading