Skip to content

Commit

Permalink
ddl: dynamically adjusting the max write speed of reorganization job (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fzzf678 authored Nov 25, 2024
1 parent 06c6e40 commit ec8b81b
Show file tree
Hide file tree
Showing 26 changed files with 358 additions and 99 deletions.
47 changes: 31 additions & 16 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -793,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
func adjustWorkerCntAndMaxWriteSpeed(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
Expand All @@ -817,36 +818,50 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault()
if maxWriteSpeed != bcCtx.GetLocalBackend().GetWriteSpeedLimit() {
bcCtx.GetLocalBackend().UpdateWriteSpeedLimit(maxWriteSpeed)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("max write speed", bcCtx.GetLocalBackend().GetWriteSpeedLimit()))
}

concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt {
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}

// Adjust worker pool size dynamically.
// Adjust worker pool size and max write speed dynamically.
var wg util.WaitGroupWrapper
adjustCtx, cancel := context.WithCancel(ctx)
if job != nil {
wg.Add(1)
go func() {
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
defer wg.Done()
adjustWorkerCntAndMaxWriteSpeed(adjustCtx, pipe, job, bcCtx, avgRowSize)
}()
}

err = pipe.Close()

cancel()
wg.Wait() // wait for adjustWorkerCntAndMaxWriteSpeed to exit
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetMaxWriteSpeedOrDefault(),
job.RealStartTS,
)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe, nil, 0)
return executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
7 changes: 3 additions & 4 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,9 @@ func TestValidateAndFillRanges(t *testing.T) {
}

func TestTuneTableScanWorkerBatchSize(t *testing.T) {
reorgMeta := &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 32,
}
reorgMeta := &model.DDLReorgMeta{}
reorgMeta.Concurrency.Store(4)
reorgMeta.BatchSize.Store(32)
copCtx := &copr.CopContextSingleIndex{
CopContextBase: &copr.CopContextBase{
FieldTypes: []*types.FieldType{},
Expand Down
48 changes: 36 additions & 12 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,13 +1161,12 @@ func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
tk.MustExec("create table t (a int);")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{},
}
job.ReorgMeta.Concurrency.Store(4)
job.ReorgMeta.BatchSize.Store(128)
insertMockJob2Table(tk, &job)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
j := getJobMetaByID(t, tk, job.ID)
Expand All @@ -1193,8 +1192,34 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
// invalid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 10.5;", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '16';", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = '';", "the value for thread is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 321.3;", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '512';", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = '';", "the value for batch_size is invalid, only integer is allowed")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2251799813685248 for max_write_speed is out of range [0, 1125899906842624]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = -1;", "the value -1 for max_write_speed is out of range [0, 1125899906842624]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.23;", "the value 1.23 for max_write_speed is invalid")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'MiB';", "parse max_write_speed value error: invalid size: 'MiB'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 'asd';", "parse max_write_speed value error: invalid size: 'asd'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '';", "parse max_write_speed value error: invalid size: ''")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '20xl';", "parse max_write_speed value error: invalid suffix: 'xl'")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1.2.3;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 46 near \".3;\" ")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 20+30;", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 44 near \"+30;\" ")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = rand();", "[parser:1064]You have an error in your SQL syntax; check the manual that corresponds to your TiDB version for the right syntax to use line 1 column 45 near \"rand();\" ")
// valid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = 1;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '1.23';", "ddl job 1 is not running")

// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
Expand Down Expand Up @@ -1232,13 +1257,12 @@ func TestAdminAlterDDLJobCommitFailed(t *testing.T) {
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed")

job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{},
}
job.ReorgMeta.Concurrency.Store(4)
job.ReorgMeta.BatchSize.Store(128)
insertMockJob2Table(tk, &job)
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID),
"mock commit failed on admin alter ddl jobs")
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.SetBatchSize(variable.TidbOptInt(sv, 0))
}
m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load()))
}
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
Expand Down Expand Up @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand All @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func genConfig(
unique bool,
resourceGroup string,
concurrency int,
maxWriteSpeed int,
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
Expand All @@ -68,7 +69,7 @@ func genConfig(
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
StoreWriteBWLimit: maxWriteSpeed,
}
// Each backend will build a single dir in lightning dir.
if ImporterRangeConcurrencyForTest != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,13 +867,19 @@ func (w *worker) runOneJobStep(
if latestJob.IsAlterable() {
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())))
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault())
}
}
}
}
})
}
}
// When upgrading from a version where the ReorgMeta fields did not exist in the DDL job information,
// the unmarshalled job will have a nil value for the ReorgMeta field.
if w.tp == addIdxWorker && job.ReorgMeta == nil {
job.ReorgMeta = &model.DDLReorgMeta{}
}

prevState := job.State

Expand Down
20 changes: 13 additions & 7 deletions pkg/executor/operate_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,24 @@ func (e *AlterDDLJobExec) processAlterDDLJobConfig(

func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error {
for _, opt := range e.AlterOpts {
if opt.Value == nil {
continue
}
switch opt.Name {
case core.AlterDDLJobThread:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64()))
}
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetConcurrency(int(cons.Value.GetInt64()))
job.AdminOperator = byWho
case core.AlterDDLJobBatchSize:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
job.AdminOperator = byWho
case core.AlterDDLJobMaxWriteSpeed:
speed, err := core.GetMaxWriteSpeedFromExpression(opt)
if err != nil {
return err
}
job.ReorgMeta.SetMaxWriteSpeed(int(speed))
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/show_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string {
if job.MayNeedReorg() {
concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
maxWriteSpeed := m.GetMaxWriteSpeedOrDefault()
if concurrency != variable.DefTiDBDDLReorgWorkerCount {
labels = append(labels, fmt.Sprintf("thread=%d", concurrency))
}
if batchSize != variable.DefTiDBDDLReorgBatchSize {
labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize))
}
if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed {
labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed))
}
if m.TargetScope != "" {
labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope))
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/executor/show_ddl_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,33 @@ func TestShowCommentsFromJob(t *testing.T) {
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: 8,
BatchSize: 1024,
}
job.ReorgMeta.Concurrency.Store(8)
job.ReorgMeta.BatchSize.Store(1024)
job.ReorgMeta.MaxWriteSpeed.Store(1024 * 1024)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
}
job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount)
job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize)
job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
TargetScope: "background",
}
job.ReorgMeta.Concurrency.Store(variable.DefTiDBDDLReorgWorkerCount)
job.ReorgMeta.BatchSize.Store(variable.DefTiDBDDLReorgBatchSize)
job.ReorgMeta.MaxWriteSpeed.Store(variable.DefTiDBDDLReorgMaxWriteSpeed)
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, service_scope=background", res)
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,7 @@ func NewBackend(
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
} else {
writeLimiter = noopStoreWriteLimiter{}
}
writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit)
local := &Backend{
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
Expand Down
Loading

0 comments on commit ec8b81b

Please sign in to comment.