From ec8b81b98edceb290a76fd4eae1050a62c0c9b3e Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Mon, 25 Nov 2024 22:13:35 +0800 Subject: [PATCH] ddl: dynamically adjusting the max write speed of reorganization job (#57611) ref pingcap/tidb#57526 --- pkg/ddl/backfilling.go | 47 ++++++++----- pkg/ddl/backfilling_dist_executor.go | 1 + pkg/ddl/backfilling_read_index.go | 4 +- pkg/ddl/backfilling_test.go | 7 +- pkg/ddl/db_test.go | 48 +++++++++---- pkg/ddl/executor.go | 1 + pkg/ddl/index.go | 2 +- pkg/ddl/ingest/backend_mgr.go | 4 +- pkg/ddl/ingest/config.go | 3 +- pkg/ddl/ingest/mock.go | 2 +- pkg/ddl/job_worker.go | 6 ++ pkg/executor/operate_ddl_jobs.go | 20 ++++-- pkg/executor/show_ddl_jobs.go | 4 ++ pkg/executor/show_ddl_jobs_test.go | 17 +++-- pkg/lightning/backend/local/local.go | 7 +- pkg/lightning/backend/local/local_test.go | 8 +-- pkg/lightning/backend/local/localhelper.go | 68 +++++++++++++------ .../backend/local/localhelper_test.go | 37 ++++++++++ pkg/lightning/backend/local/region_job.go | 10 +++ .../backend/local/region_job_test.go | 22 ++++++ pkg/meta/model/BUILD.bazel | 1 + pkg/meta/model/reorg.go | 30 +++++--- pkg/planner/core/BUILD.bazel | 2 + pkg/planner/core/planbuilder.go | 49 ++++++++++++- pkg/planner/core/planbuilder_test.go | 55 +++++++++++++-- .../addindextest3/functional_test.go | 2 +- 26 files changed, 358 insertions(+), 99 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index e3d52c22481db..394b1e77436ed 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( //nolint: forcetypeassert discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() bcCtx, err := ingest.LitBackCtxMgr.Register( - ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS) + ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS) if err != nil { return errors.Trace(err) } @@ -776,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize) + err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize) if err != nil { err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines) if err1 != nil { @@ -793,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode( return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup) } -func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) { +func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) { opR, opW := pipe.GetLocalIngestModeReaderAndWriter() if opR == nil || opW == nil { logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID)) @@ -817,36 +818,50 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job case <-ctx.Done(): return case <-ticker.C: - targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt( - job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize) + maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault() + if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() { + bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed) + logutil.DDLIngestLogger().Info("adjust ddl job config success", + zap.Int64("jobID", job.ID), + zap.Int("max write speed", bcCtx.GetLocalBackend().GetWriteSpeedLimit())) + } + + concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) + targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize) currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize() - if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt { - continue + if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt { + reader.TuneWorkerPoolSize(int32(targetReaderCnt)) + writer.TuneWorkerPoolSize(int32(targetWriterCnt)) + logutil.DDLIngestLogger().Info("adjust ddl job config success", + zap.Int64("jobID", job.ID), + zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), + zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) } - reader.TuneWorkerPoolSize(int32(targetReaderCnt)) - writer.TuneWorkerPoolSize(int32(targetWriterCnt)) - logutil.DDLIngestLogger().Info("adjust ddl job config success", - zap.Int64("jobID", job.ID), - zap.Int32("table scan operator count", reader.GetWorkerPoolSize()), - zap.Int32("index ingest operator count", writer.GetWorkerPoolSize())) } } } -func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error { +func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error { err := pipe.Execute() if err != nil { return err } - // Adjust worker pool size dynamically. + // Adjust worker pool size and max write speed dynamically. + var wg util.WaitGroupWrapper + adjustCtx, cancel := context.WithCancel(ctx) if job != nil { + wg.Add(1) go func() { - adjustWorkerPoolSize(ctx, pipe, job, avgRowSize) + defer wg.Done() + adjustWorkerCntAndMaxWriteSpeed(adjustCtx, pipe, job, bcCtx, avgRowSize) }() } err = pipe.Close() + + cancel() + wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit if opErr := ctx.OperatorErr(); opErr != nil { return opErr } diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index d9b5e6c062273..799be7baa1f8c 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { discovery, job.ReorgMeta.ResourceGroupName, job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), + job.ReorgMeta.GetMaxWriteSpeedOrDefault(), job.RealStartTS, ) } diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index de0b3c8409db6..14e7fb501514d 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if err != nil { return err } - return executeAndClosePipeline(opCtx, pipe, nil, 0) + return executeAndClosePipeline(opCtx, pipe, nil, nil, 0) } pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency) if err != nil { return err } - err = executeAndClosePipeline(opCtx, pipe, nil, 0) + err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0) if err != nil { // For dist task local based ingest, checkpoint is unsupported. // If there is an error we should keep local sort dir clean. diff --git a/pkg/ddl/backfilling_test.go b/pkg/ddl/backfilling_test.go index ed0802a45f5c8..d67fac8443331 100644 --- a/pkg/ddl/backfilling_test.go +++ b/pkg/ddl/backfilling_test.go @@ -489,10 +489,9 @@ func TestValidateAndFillRanges(t *testing.T) { } func TestTuneTableScanWorkerBatchSize(t *testing.T) { - reorgMeta := &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 32, - } + reorgMeta := &model.DDLReorgMeta{} + reorgMeta.Concurrency.Store(4) + reorgMeta.BatchSize.Store(32) copCtx := &copr.CopContextSingleIndex{ CopContextBase: &copr.CopContextBase{ FieldTypes: []*types.FieldType{}, diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index ed2957c039988..2960fd1e188e6 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -1161,13 +1161,12 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { tk.MustExec("create table t (a int);") job := model.Job{ - ID: 1, - Type: model.ActionAddIndex, - ReorgMeta: &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 128, - }, + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{}, } + job.ReorgMeta.Concurrency.Store(4) + job.ReorgMeta.BatchSize.Store(128) insertMockJob2Table(tk, &job) tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) j := getJobMetaByID(t, tk, job.ID) @@ -1193,8 +1192,34 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { // invalid config value tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]") tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 10.5;", "the value for thread is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '16';", "the value for thread is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '';", "the value for thread is invalid, only integer is allowed") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 321.3;", "the value for batch_size is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '512';", "the value for batch_size is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '';", "the value for batch_size is invalid, only integer is allowed") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2251799813685248 for max_write_speed is out of range [0, 1125899906842624]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.23;", "the value 1.23 for max_write_speed is invalid") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'MiB';", "parse max_write_speed value error: invalid size: 'MiB'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'asd';", "parse max_write_speed value error: invalid size: 'asd'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '';", "parse max_write_speed value error: invalid size: ''") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '20xl';", "parse max_write_speed value error: invalid suffix: 'xl'") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.2.3;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 46 near \".3;\" ") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 20+30;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 44 near \"+30;\" ") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = rand();", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 45 near \"rand();\" ") + // valid config value + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1;", "ddl job 1 is not running") + tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '1.23';", "ddl job 1 is not running") // invalid job id tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running") @@ -1232,13 +1257,12 @@ func TestAdminAlterDDLJobCommitFailed(t *testing.T) { defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed") job := model.Job{ - ID: 1, - Type: model.ActionAddIndex, - ReorgMeta: &model.DDLReorgMeta{ - Concurrency: 4, - BatchSize: 128, - }, + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{}, } + job.ReorgMeta.Concurrency.Store(4) + job.ReorgMeta.BatchSize.Store(128) insertMockJob2Table(tk, &job) tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID), "mock commit failed on admin alter ddl jobs") diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index cba9955689c25..1c1b7a384c823 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok { m.SetBatchSize(variable.TidbOptInt(sv, 0)) } + m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load())) } setDistTaskParam := func() error { m.IsDistReorg = variable.EnableDistTask.Load() diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 7bdf73313b708..824cedfc2a56f 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo ctx := tidblogutil.WithCategory(ctx, "ddl-ingest") if bc == nil { bc, err = ingest.LitBackCtxMgr.Register( - ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS) + ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS) if err != nil { return err } diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 81c29617ae0c3..048f883e4ceef 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -51,6 +51,7 @@ type BackendCtxMgr interface { pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, + maxWriteSpeed int, initTS uint64, ) (BackendCtx, error) Unregister(jobID int64) @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register( pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, concurrency int, + maxWriteSpeed int, initTS uint64, ) (BackendCtx, error) { bc, exist := m.Load(jobID) @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register( logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err)) return nil, err } - cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency) + cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed) if err != nil { logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err)) return nil, err diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index cc6419c7271ce..a97cb81d9cd03 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -44,6 +44,7 @@ func genConfig( unique bool, resourceGroup string, concurrency int, + maxWriteSpeed int, ) (*local.BackendConfig, error) { cfg := &local.BackendConfig{ LocalStoreDir: jobSortPath, @@ -68,7 +69,7 @@ func genConfig( PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable, TaskType: kvutil.ExplicitTypeDDL, DisableAutomaticCompactions: true, - StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()), + StoreWriteBWLimit: maxWriteSpeed, } // Each backend will build a single dir in lightning dir. if ImporterRangeConcurrencyForTest != nil { diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 59dcb167fdfec..88b50ad099255 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) { // Register implements BackendCtxMgr.Register interface. func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, - pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) { + pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) { logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index 01c0dc5547cfc..5a5c45f127d36 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -867,6 +867,7 @@ func (w *worker) runOneJobStep( if latestJob.IsAlterable() { job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))) job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))) + job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault()) } } } @@ -874,6 +875,11 @@ func (w *worker) runOneJobStep( }) } } + // When upgrading from a version where the ReorgMeta fields did not exist in the DDL job information, + // the unmarshalled job will have a nil value for the ReorgMeta field. + if w.tp == addIdxWorker && job.ReorgMeta == nil { + job.ReorgMeta = &model.DDLReorgMeta{} + } prevState := job.State diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 5a48ee4fd535a..2553f71ac5c0a 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -197,18 +197,24 @@ func (e *AlterDDLJobExec) processAlterDDLJobConfig( func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error { for _, opt := range e.AlterOpts { + if opt.Value == nil { + continue + } switch opt.Name { case core.AlterDDLJobThread: - if opt.Value != nil { - cons := opt.Value.(*expression.Constant) - job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64())) - } + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64())) job.AdminOperator = byWho case core.AlterDDLJobBatchSize: - if opt.Value != nil { - cons := opt.Value.(*expression.Constant) - job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64())) + job.AdminOperator = byWho + case core.AlterDDLJobMaxWriteSpeed: + speed, err := core.GetMaxWriteSpeedFromExpression(opt) + if err != nil { + return err } + job.ReorgMeta.SetMaxWriteSpeed(int(speed)) job.AdminOperator = byWho default: return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name) diff --git a/pkg/executor/show_ddl_jobs.go b/pkg/executor/show_ddl_jobs.go index 440e0bc995ffb..a885d5d5e18c9 100644 --- a/pkg/executor/show_ddl_jobs.go +++ b/pkg/executor/show_ddl_jobs.go @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string { if job.MayNeedReorg() { concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())) batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())) + maxWriteSpeed := m.GetMaxWriteSpeedOrDefault() if concurrency != variable.DefTiDBDDLReorgWorkerCount { labels = append(labels, fmt.Sprintf("thread=%d", concurrency)) } if batchSize != variable.DefTiDBDDLReorgBatchSize { labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize)) } + if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed { + labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed)) + } if m.TargetScope != "" { labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope)) } diff --git a/pkg/executor/show_ddl_jobs_test.go b/pkg/executor/show_ddl_jobs_test.go index 5b9aabd3324fb..a5105f0588841 100644 --- a/pkg/executor/show_ddl_jobs_test.go +++ b/pkg/executor/show_ddl_jobs_test.go @@ -68,19 +68,21 @@ func TestShowCommentsFromJob(t *testing.T) { ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: 8, - BatchSize: 1024, } + job.ReorgMeta.Concurrency.Store(8) + job.ReorgMeta.BatchSize.Store(1024) + job.ReorgMeta.MaxWriteSpeed.Store(1024 * 1024) res = showCommentsFromJob(job) - require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res) + require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res) job.ReorgMeta = &model.DDLReorgMeta{ ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: variable.DefTiDBDDLReorgWorkerCount, - BatchSize: variable.DefTiDBDDLReorgBatchSize, } + job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount) + job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize) + job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed) res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud", res) @@ -88,10 +90,11 @@ func TestShowCommentsFromJob(t *testing.T) { ReorgTp: model.ReorgTypeLitMerge, IsDistReorg: true, UseCloudStorage: true, - Concurrency: variable.DefTiDBDDLReorgWorkerCount, - BatchSize: variable.DefTiDBDDLReorgBatchSize, TargetScope: "background", } + job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount) + job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize) + job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed) res = showCommentsFromJob(job) require.Equal(t, "ingest, DXF, cloud, service_scope=background", res) } diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index e2e757bb48563..eaeda026560d6 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -637,12 +637,7 @@ func NewBackend( return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs() } - var writeLimiter StoreWriteLimiter - if config.StoreWriteBWLimit > 0 { - writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit) - } else { - writeLimiter = noopStoreWriteLimiter{} - } + writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit) local := &Backend{ pdCli: pdCli, pdHTTPCli: pdHTTPCli, diff --git a/pkg/lightning/backend/local/local_test.go b/pkg/lightning/backend/local/local_test.go index 35dd73dd36363..bac7821f33122 100644 --- a/pkg/lightning/backend/local/local_test.go +++ b/pkg/lightning/backend/local/local_test.go @@ -1223,7 +1223,7 @@ func TestCheckPeersBusy(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, @@ -1347,7 +1347,7 @@ func TestNotLeaderErrorNeedUpdatePeers(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, BackendConfig: BackendConfig{ ShouldCheckWriteStall: true, @@ -1446,7 +1446,7 @@ func TestPartialWriteIngestErrorWontPanic(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, tikvCodec: keyspace.CodecV1, BackendConfig: BackendConfig{ @@ -1542,7 +1542,7 @@ func TestPartialWriteIngestBusy(t *testing.T) { }, }, logger: log.L(), - writeLimiter: noopStoreWriteLimiter{}, + writeLimiter: newStoreWriteLimiter(0), supportMultiIngest: true, tikvCodec: keyspace.CodecV1, BackendConfig: BackendConfig{ diff --git a/pkg/lightning/backend/local/localhelper.go b/pkg/lightning/backend/local/localhelper.go index e042a78c5cdf5..402fbe3ab99cf 100644 --- a/pkg/lightning/backend/local/localhelper.go +++ b/pkg/lightning/backend/local/localhelper.go @@ -19,6 +19,7 @@ import ( "context" "math" "sync" + "sync/atomic" "time" "github.com/docker/go-units" @@ -133,33 +134,46 @@ func largerStartKey(a, b []byte) []byte { type StoreWriteLimiter interface { WaitN(ctx context.Context, storeID uint64, n int) error Limit() int + UpdateLimit(limit int) } type storeWriteLimiter struct { rwm sync.RWMutex limiters map[uint64]*rate.Limiter - limit int - burst int + // limit and burst can only be non-negative, 0 means no rate limiting. + limit atomic.Int64 + burst atomic.Int64 } func newStoreWriteLimiter(limit int) *storeWriteLimiter { - var burst int - // Allow burst of at most 20% of the limit. - if limit <= math.MaxInt-limit/5 { - burst = limit + limit/5 + l, b := calculateLimitAndBurst(limit) + s := &storeWriteLimiter{ + limiters: make(map[uint64]*rate.Limiter), + } + s.limit.Store(l) + s.burst.Store(b) + return s +} + +func calculateLimitAndBurst(writeLimit int) (limit int64, burst int64) { + if writeLimit <= 0 { + return 0, 0 + } + // Allow burst of at most 20% of the writeLimit. + if writeLimit <= math.MaxInt-writeLimit/5 { + burst = int64(writeLimit) + int64(writeLimit)/5 } else { // If overflowed, set burst to math.MaxInt. burst = math.MaxInt } - return &storeWriteLimiter{ - limiters: make(map[uint64]*rate.Limiter), - limit: limit, - burst: burst, - } + return int64(writeLimit), burst } func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) error { limiter := s.getLimiter(storeID) + if limiter == nil { + return nil + } // The original WaitN doesn't allow n > burst, // so we call WaitN with burst multiple times. for n > limiter.Burst() { @@ -172,10 +186,13 @@ func (s *storeWriteLimiter) WaitN(ctx context.Context, storeID uint64, n int) er } func (s *storeWriteLimiter) Limit() int { - return s.limit + return int(s.limit.Load()) } func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { + if s.limit.Load() == 0 { + return nil + } s.rwm.RLock() limiter, ok := s.limiters[storeID] s.rwm.RUnlock() @@ -186,20 +203,31 @@ func (s *storeWriteLimiter) getLimiter(storeID uint64) *rate.Limiter { defer s.rwm.Unlock() limiter, ok = s.limiters[storeID] if !ok { - limiter = rate.NewLimiter(rate.Limit(s.limit), s.burst) + limiter = rate.NewLimiter(rate.Limit(s.limit.Load()), int(s.burst.Load())) s.limiters[storeID] = limiter } return limiter } -type noopStoreWriteLimiter struct{} - -func (noopStoreWriteLimiter) WaitN(_ context.Context, _ uint64, _ int) error { - return nil -} +func (s *storeWriteLimiter) UpdateLimit(newLimit int) { + limit, burst := calculateLimitAndBurst(newLimit) + if s.limit.Load() == limit { + return + } -func (noopStoreWriteLimiter) Limit() int { - return math.MaxInt + s.limit.Store(limit) + s.burst.Store(burst) + // Update all existing limiters with the new limit and burst values. + s.rwm.Lock() + defer s.rwm.Unlock() + if s.limit.Load() == 0 { + s.limiters = make(map[uint64]*rate.Limiter) + return + } + for _, limiter := range s.limiters { + limiter.SetLimit(rate.Limit(s.limit.Load())) + limiter.SetBurst(int(s.burst.Load())) + } } // compaction threshold diff --git a/pkg/lightning/backend/local/localhelper_test.go b/pkg/lightning/backend/local/localhelper_test.go index 4baa322c81e49..f107318422e0b 100644 --- a/pkg/lightning/backend/local/localhelper_test.go +++ b/pkg/lightning/backend/local/localhelper_test.go @@ -325,3 +325,40 @@ func TestStoreWriteLimiter(t *testing.T) { } wg.Wait() } + +func TestTuneStoreWriteLimiter(t *testing.T) { + limiter := newStoreWriteLimiter(100) + testLimiter := func(ctx context.Context, maxT int) { + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(storeID uint64) { + defer wg.Done() + start := time.Now() + var gotTokens int + for { + n := rand.Intn(50) + if limiter.WaitN(ctx, storeID, n) != nil { + break + } + gotTokens += n + } + elapsed := time.Since(start) + maxTokens := int(1.2*float64(maxT)) + int(elapsed.Seconds()*float64(maxT)) + // In theory, gotTokens should be less than or equal to maxT. + // But we allow a little of error to avoid the test being flaky. + require.LessOrEqual(t, gotTokens, maxTokens+1) + }(uint64(i)) + } + wg.Wait() + } + + ctx0, cancel0 := context.WithTimeout(context.Background(), time.Second*2) + defer cancel0() + testLimiter(ctx0, 100) + + limiter.UpdateLimit(200) + ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second*2) + defer cancel1() + testLimiter(ctx1, 200) +} diff --git a/pkg/lightning/backend/local/region_job.go b/pkg/lightning/backend/local/region_job.go index 14f1c9a8191e5..891a8cdffb261 100644 --- a/pkg/lightning/backend/local/region_job.go +++ b/pkg/lightning/backend/local/region_job.go @@ -780,6 +780,16 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe return resp, nil } +// UpdateWriteSpeedLimit updates the write limiter of the backend. +func (local *Backend) UpdateWriteSpeedLimit(limit int) { + local.writeLimiter.UpdateLimit(limit) +} + +// GetWriteSpeedLimit returns the speed of the write limiter. +func (local *Backend) GetWriteSpeedLimit() int { + return local.writeLimiter.Limit() +} + // convertStageOnIngestError will try to fix the error contained in ingest response. // Return (_, error) when another error occurred. // Return (true, nil) when the job can retry ingesting immediately. diff --git a/pkg/lightning/backend/local/region_job_test.go b/pkg/lightning/backend/local/region_job_test.go index 3a34c4f6e0f0d..c22e223554630 100644 --- a/pkg/lightning/backend/local/region_job_test.go +++ b/pkg/lightning/backend/local/region_job_test.go @@ -579,3 +579,25 @@ func TestStoreBalancerNoRace(t *testing.T) { <-done2 require.Len(t, jobFromWorkerCh, 0) } + +func TestUpdateAndGetLimiterConcurrencySafety(t *testing.T) { + backend := &Backend{ + writeLimiter: newStoreWriteLimiter(0), + } + + var wg sync.WaitGroup + concurrentRoutines := 100 + for i := 0; i < concurrentRoutines; i++ { + wg.Add(2) + go func(limit int) { + defer wg.Done() + backend.UpdateWriteSpeedLimit(limit) + }(i) + + go func() { + defer wg.Done() + _ = backend.GetWriteSpeedLimit() + }() + } + wg.Wait() +} diff --git a/pkg/meta/model/BUILD.bazel b/pkg/meta/model/BUILD.bazel index 65516797fc58e..0dd924c3903ae 100644 --- a/pkg/meta/model/BUILD.bazel +++ b/pkg/meta/model/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//pkg/util/intest", "@com_github_pingcap_errors//:errors", "@com_github_tikv_pd_client//http", + "@org_uber_go_atomic//:atomic", ], ) diff --git a/pkg/meta/model/reorg.go b/pkg/meta/model/reorg.go index fc117cc8679a4..723a3fda3a9b7 100644 --- a/pkg/meta/model/reorg.go +++ b/pkg/meta/model/reorg.go @@ -16,11 +16,11 @@ package model import ( "encoding/json" - "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" + "go.uber.org/atomic" ) // BackfillState is the state used by the backfill-merge process. @@ -75,15 +75,16 @@ type DDLReorgMeta struct { // These two variables are used to control the concurrency and batch size of the reorganization process. // They can be adjusted dynamically through `admin alter ddl jobs` command. // Note: Don't get or set these two variables directly, use the functions instead. - Concurrency int64 `json:"concurrency"` - BatchSize int64 `json:"batch_size"` + Concurrency atomic.Int64 `json:"concurrency"` + BatchSize atomic.Int64 `json:"batch_size"` + MaxWriteSpeed atomic.Int64 `json:"max_write_speed"` } // GetConcurrencyOrDefault gets the concurrency from DDLReorgMeta, // pass the default value in case of the reorg meta coming from old cluster and Concurrency is 0. func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { - concurrency := atomic.LoadInt64(&dm.Concurrency) - if dm == nil || concurrency == 0 { + concurrency := dm.Concurrency.Load() + if concurrency == 0 { return defaultVal } return int(concurrency) @@ -91,13 +92,13 @@ func (dm *DDLReorgMeta) GetConcurrencyOrDefault(defaultVal int) int { // SetConcurrency sets the concurrency in DDLReorgMeta. func (dm *DDLReorgMeta) SetConcurrency(concurrency int) { - atomic.StoreInt64(&dm.Concurrency, int64(concurrency)) + dm.Concurrency.Store(int64(concurrency)) } // GetBatchSizeOrDefault gets the batch size from DDLReorgMeta. func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { - batchSize := atomic.LoadInt64(&dm.BatchSize) - if dm == nil || batchSize == 0 { + batchSize := dm.BatchSize.Load() + if batchSize == 0 { return defaultVal } return int(batchSize) @@ -105,7 +106,18 @@ func (dm *DDLReorgMeta) GetBatchSizeOrDefault(defaultVal int) int { // SetBatchSize sets the batch size in DDLReorgMeta. func (dm *DDLReorgMeta) SetBatchSize(batchSize int) { - atomic.StoreInt64(&dm.BatchSize, int64(batchSize)) + dm.BatchSize.Store(int64(batchSize)) +} + +// GetMaxWriteSpeedOrDefault gets the max write speed from DDLReorgMeta. +// 0 means no limit. +func (dm *DDLReorgMeta) GetMaxWriteSpeedOrDefault() int { + return int(dm.MaxWriteSpeed.Load()) +} + +// SetMaxWriteSpeed sets the max write speed in DDLReorgMeta. +func (dm *DDLReorgMeta) SetMaxWriteSpeed(maxWriteSpeed int) { + dm.MaxWriteSpeed.Store(int64(maxWriteSpeed)) } const ( diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index fa4bf2170eadc..cee3079583f42 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -196,6 +196,7 @@ go_library( "//pkg/util/tiflashcompute", "//pkg/util/tracing", "@com_github_bits_and_blooms_bitset//:bitset", + "@com_github_docker_go_units//:go-units", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", @@ -325,6 +326,7 @@ go_test( "//pkg/util/ranger", "//pkg/util/stmtsummary", "//pkg/util/tracing", + "@com_github_docker_go_units//:go-units", "@com_github_golang_snappy//:snappy", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 100d70e810789..9558a0417fd03 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/bindinfo" @@ -5930,18 +5931,62 @@ func (b *PlanBuilder) buildAdminAlterDDLJob(ctx context.Context, as *ast.AdminSt func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { switch opt.Name { case AlterDDLJobThread: - thread := opt.Value.(*expression.Constant).Value.GetInt64() + thread, err := GetThreadOrBatchSizeFromExpression(opt) + if err != nil { + return err + } if thread < 1 || thread > variable.MaxConfigurableConcurrency { return fmt.Errorf("the value %v for %s is out of range [1, %v]", thread, opt.Name, variable.MaxConfigurableConcurrency) } case AlterDDLJobBatchSize: - batchSize := opt.Value.(*expression.Constant).Value.GetInt64() + batchSize, err := GetThreadOrBatchSizeFromExpression(opt) + if err != nil { + return err + } bs := int32(batchSize) if bs < variable.MinDDLReorgBatchSize || bs > variable.MaxDDLReorgBatchSize { return fmt.Errorf("the value %v for %s is out of range [%v, %v]", bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) } + case AlterDDLJobMaxWriteSpeed: + speed, err := GetMaxWriteSpeedFromExpression(opt) + if err != nil { + return err + } + if speed < 0 || speed > units.PiB { + return fmt.Errorf("the value %s for %s is out of range [%v, %v]", + strconv.FormatInt(speed, 10), opt.Name, 0, units.PiB) + } } return nil } + +// GetThreadOrBatchSizeFromExpression gets the numeric value of the thread or batch size from the expression. +func GetThreadOrBatchSizeFromExpression(opt *AlterDDLJobOpt) (int64, error) { + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETInt: + return v.Value.GetInt64(), nil + default: + return 0, fmt.Errorf("the value for %s is invalid, only integer is allowed", opt.Name) + } +} + +// GetMaxWriteSpeedFromExpression gets the numeric value of the max write speed from the expression. +func GetMaxWriteSpeedFromExpression(opt *AlterDDLJobOpt) (maxWriteSpeed int64, err error) { + v := opt.Value.(*expression.Constant) + switch v.RetType.EvalType() { + case types.ETString: + speedStr := v.Value.GetString() + maxWriteSpeed, err = units.RAMInBytes(speedStr) + if err != nil { + return 0, errors.Annotate(err, "parse max_write_speed value error") + } + case types.ETInt: + maxWriteSpeed = v.Value.GetInt64() + default: + return 0, fmt.Errorf("the value %v for %s is invalid", v.Value.GetValue(), opt.Name) + } + return maxWriteSpeed, nil +} diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index d2c416960b7da..eda77c5f33981 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -17,12 +17,14 @@ package core import ( "context" "fmt" + "math/rand" "reflect" "sort" "strings" "testing" "unsafe" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression" @@ -892,7 +894,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok := p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(1)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobThread) cons, ok := plan.Options[0].Value.(*expression.Constant) require.True(t, ok) @@ -905,7 +907,7 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(2)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) @@ -918,20 +920,33 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(3)) - require.Equal(t, len(plan.Options), 1) + require.Len(t, plan.Options, 1) require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) cons, ok = plan.Options[0].Value.(*expression.Constant) require.True(t, ok) require.Equal(t, cons.Value.GetString(), "10MiB") - stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 max_write_speed = 1024", "", "") require.NoError(t, err) p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) require.NoError(t, err) plan, ok = p.(*AlterDDLJob) require.True(t, ok) require.Equal(t, plan.JobID, int64(4)) - require.Equal(t, len(plan.Options), 3) + require.Len(t, plan.Options, 1) + require.Equal(t, AlterDDLJobMaxWriteSpeed, plan.Options[0].Name) + cons, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.EqualValues(t, 1024, cons.Value.GetInt64()) + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 5 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") + require.NoError(t, err) + p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok = p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(5)) + require.Len(t, plan.Options, 3) sort.Slice(plan.Options, func(i, j int) bool { return plan.Options[i].Name < plan.Options[j].Name }) @@ -953,3 +968,33 @@ func TestBuildAdminAlterDDLJobPlan(t *testing.T) { _, err = builder.Build(ctx, resolve.NewNodeW(stmt)) require.Equal(t, err.Error(), "unsupported admin alter ddl jobs config: aaa") } + +func TestGetMaxWriteSpeedFromExpression(t *testing.T) { + parser := parser.New() + sctx := MockContext() + ctx := context.TODO() + builder, _ := NewPlanBuilder().Init(sctx, nil, hint.NewQBHintHandler(nil)) + // random speed value + n := rand.Intn(units.PiB + 1) + stmt, err := parser.ParseOneStmt(fmt.Sprintf("admin alter ddl jobs 1 max_write_speed = %d ", n), "", "") + require.NoError(t, err) + p, err := builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok := p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(1)) + require.Len(t, plan.Options, 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) + _, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + maxWriteSpeed, err := GetMaxWriteSpeedFromExpression(plan.Options[0]) + require.NoError(t, err) + require.Equal(t, int64(n), maxWriteSpeed) + // parse speed string error + opt := &AlterDDLJobOpt{ + Name: "test", + Value: expression.NewStrConst("MiB"), + } + _, err = GetMaxWriteSpeedFromExpression(opt) + require.Equal(t, "parse max_write_speed value error: invalid size: 'MiB'", err.Error()) +} diff --git a/tests/realtikvtest/addindextest3/functional_test.go b/tests/realtikvtest/addindextest3/functional_test.go index aaa6eb4e29764..6e37fb5e04bcd 100644 --- a/tests/realtikvtest/addindextest3/functional_test.go +++ b/tests/realtikvtest/addindextest3/functional_test.go @@ -85,7 +85,7 @@ func TestDDLTestEstimateTableRowSize(t *testing.T) { func TestBackendCtxConcurrentUnregister(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) discovery := store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery() - bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0) + bCtx, err := ingest.LitBackCtxMgr.Register(context.Background(), 1, false, nil, discovery, "test", 1, 0, 0) require.NoError(t, err) idxIDs := []int64{1, 2, 3, 4, 5, 6, 7} uniques := make([]bool, 0, len(idxIDs))