Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamically adjusting the max write speed of reorganization job (#57611) #57690

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -793,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
Expand All @@ -817,36 +818,50 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("max write speed", bcCtx.GetLocalBackend().GetWriteSpeedLimit()))
}

concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt {
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}

// Adjust worker pool size dynamically.
// Adjust worker pool size and max write speed dynamically.
var wg util.WaitGroupWrapper
adjustCtx, cancel := context.WithCancel(ctx)
if job != nil {
wg.Add(1)
go func() {
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
defer wg.Done()
adjustWorkerCntAndMaxWriteSpeed(adjustCtx, pipe, job, bcCtx, avgRowSize)
}()
}

err = pipe.Close()

cancel()
wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetMaxWriteSpeedOrDefault(),
job.RealStartTS,
)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe, nil, 0)
return executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
7 changes: 3 additions & 4 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,9 @@ func TestValidateAndFillRanges(t *testing.T) {
}

func TestTuneTableScanWorkerBatchSize(t *testing.T) {
reorgMeta := &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 32,
}
reorgMeta := &model.DDLReorgMeta{}
reorgMeta.Concurrency.Store(4)
reorgMeta.BatchSize.Store(32)
copCtx := &copr.CopContextSingleIndex{
CopContextBase: &copr.CopContextBase{
FieldTypes: []*types.FieldType{},
Expand Down
48 changes: 36 additions & 12 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,13 +1161,12 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
tk.MustExec("create table t (a int);")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{},
}
job.ReorgMeta.Concurrency.Store(4)
job.ReorgMeta.BatchSize.Store(128)
insertMockJob2Table(tk, &job)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
j := getJobMetaByID(t, tk, job.ID)
Expand All @@ -1193,8 +1192,34 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
// invalid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 10.5;", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '16';", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '';", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 321.3;", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '512';", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '';", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2251799813685248 for max_write_speed is out of range [0, 1125899906842624]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.23;", "the value 1.23 for max_write_speed is invalid")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'MiB';", "parse max_write_speed value error: invalid size: 'MiB'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'asd';", "parse max_write_speed value error: invalid size: 'asd'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '';", "parse max_write_speed value error: invalid size: ''")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '20xl';", "parse max_write_speed value error: invalid suffix: 'xl'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.2.3;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 46 near \".3;\" ")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 20+30;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 44 near \"+30;\" ")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = rand();", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 45 near \"rand();\" ")
// valid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '1.23';", "ddl job 1 is not running")

// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
Expand Down Expand Up @@ -1232,13 +1257,12 @@ func TestAdminAlterDDLJobCommitFailed(t *testing.T) {
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{},
}
job.ReorgMeta.Concurrency.Store(4)
job.ReorgMeta.BatchSize.Store(128)
insertMockJob2Table(tk, &job)
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID),
"mock commit failed on admin alter ddl jobs")
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.SetBatchSize(variable.TidbOptInt(sv, 0))
}
m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load()))
}

setDistTaskParam := func() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
Expand Down Expand Up @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand All @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func genConfig(
unique bool,
resourceGroup string,
concurrency int,
maxWriteSpeed int,
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
Expand All @@ -68,7 +69,7 @@ func genConfig(
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
StoreWriteBWLimit: maxWriteSpeed,
}
// Each backend will build a single dir in lightning dir.
if ImporterRangeConcurrencyForTest != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,13 +867,19 @@ func (w *worker) runOneJobStep(
if latestJob.IsAlterable() {
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())))
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault())
}
}
}
}
})
}
}
// When upgrading from a version where the ReorgMeta fields did not exist in the DDL job information,
// the unmarshalled job will have a nil value for the ReorgMeta field.
if w.tp == addIdxWorker && job.ReorgMeta == nil {
job.ReorgMeta = &model.DDLReorgMeta{}
}

prevState := job.State

Expand Down
20 changes: 13 additions & 7 deletions pkg/executor/operate_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,24 @@ func (e *AlterDDLJobExec) processAlterDDLJobConfig(

func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error {
for _, opt := range e.AlterOpts {
if opt.Value == nil {
continue
}
switch opt.Name {
case core.AlterDDLJobThread:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64()))
}
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64()))
job.AdminOperator = byWho
case core.AlterDDLJobBatchSize:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
job.AdminOperator = byWho
case core.AlterDDLJobMaxWriteSpeed:
speed, err := core.GetMaxWriteSpeedFromExpression(opt)
if err != nil {
return err
}
job.ReorgMeta.SetMaxWriteSpeed(int(speed))
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/show_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string {
if job.MayNeedReorg() {
concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
maxWriteSpeed := m.GetMaxWriteSpeedOrDefault()
if concurrency != variable.DefTiDBDDLReorgWorkerCount {
labels = append(labels, fmt.Sprintf("thread=%d", concurrency))
}
if batchSize != variable.DefTiDBDDLReorgBatchSize {
labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize))
}
if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed {
labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed))
}
if m.TargetScope != "" {
labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope))
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/executor/show_ddl_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,33 @@ func TestShowCommentsFromJob(t *testing.T) {
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: 8,
BatchSize: 1024,
}
job.ReorgMeta.Concurrency.Store(8)
job.ReorgMeta.BatchSize.Store(1024)
job.ReorgMeta.MaxWriteSpeed.Store(1024 * 1024)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
}
job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount)
job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize)
job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
TargetScope: "background",
}
job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount)
job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize)
job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, service_scope=background", res)
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,7 @@ func NewBackend(
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
} else {
writeLimiter = noopStoreWriteLimiter{}
}
writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit)
local := &Backend{
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
Expand Down
Loading