Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamically adjusting the max write speed of reorganization job #57611

Merged
merged 44 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
745c0bb
build execute max_write_speed
fzzf678 Nov 19, 2024
2bdcee7
Merge remote-tracking branch 'upstream/master' into polling_bwlimit
fzzf678 Nov 21, 2024
f286233
use get and set interface handle the max write speed in reorg meta
fzzf678 Nov 21, 2024
cbbd6ab
make the limiter can be adjusted
fzzf678 Nov 21, 2024
829c67f
adjust max write speed
fzzf678 Nov 21, 2024
07869e3
Update BUILD.bazel
fzzf678 Nov 21, 2024
b7eed57
fix nogo
fzzf678 Nov 22, 2024
3e3955f
fix absurd
fzzf678 Nov 22, 2024
fb8422b
show ddl jobs print max write speed
fzzf678 Nov 22, 2024
cc98597
rename
fzzf678 Nov 22, 2024
bb381f9
Update pkg/meta/model/reorg.go
fzzf678 Nov 22, 2024
eb01652
Merge branch 'polling_bwlimit' of github.com:fzzf678/tidb into pollin…
fzzf678 Nov 22, 2024
50c4334
wait goroutine exit
fzzf678 Nov 22, 2024
7d46d9e
support int type receive value
fzzf678 Nov 22, 2024
7345329
handle the nil case in Set
fzzf678 Nov 22, 2024
236a688
Update planbuilder.go
fzzf678 Nov 22, 2024
15e2726
Update planbuilder.go
fzzf678 Nov 22, 2024
8c07af0
add test
fzzf678 Nov 22, 2024
1f9a46c
fix test
fzzf678 Nov 22, 2024
543a85e
get the speed after update
fzzf678 Nov 24, 2024
5fbf892
Merge remote-tracking branch 'upstream/master' into polling_bwlimit
fzzf678 Nov 25, 2024
c1ea4f0
Update pkg/lightning/backend/local/localhelper_test.go
fzzf678 Nov 25, 2024
fc28a05
fix some coment
fzzf678 Nov 25, 2024
62cb8f3
merge duplicate code
fzzf678 Nov 25, 2024
1b25f9a
handle the reorg meta nil case when unmarshalling
fzzf678 Nov 25, 2024
845615c
Merge branch 'polling_bwlimit' of github.com:fzzf678/tidb into pollin…
fzzf678 Nov 25, 2024
3173317
use uber's atomic
fzzf678 Nov 25, 2024
1158823
update bazel
fzzf678 Nov 25, 2024
a128426
handle the reorgmeta nil
fzzf678 Nov 25, 2024
2a29bd5
remove useless param
fzzf678 Nov 25, 2024
0252822
Update pkg/planner/core/planbuilder_test.go
fzzf678 Nov 25, 2024
4237d3c
Update pkg/planner/core/planbuilder_test.go
fzzf678 Nov 25, 2024
3900fe9
Update pkg/planner/core/planbuilder.go
fzzf678 Nov 25, 2024
941af08
use require.Len()
fzzf678 Nov 25, 2024
f7cce41
use named return value
fzzf678 Nov 25, 2024
d71a826
add test and fix comment
fzzf678 Nov 25, 2024
5ba4278
Update planbuilder_test.go
fzzf678 Nov 25, 2024
a9cda92
check thread and batch_size eval type
fzzf678 Nov 25, 2024
00d0027
Update BUILD.bazel
fzzf678 Nov 25, 2024
a1cc392
correct maxTokens formula
fzzf678 Nov 25, 2024
becf32e
remove handle reorg nil case when decode
fzzf678 Nov 25, 2024
edcd1e9
Update pkg/planner/core/planbuilder.go
fzzf678 Nov 25, 2024
417432c
Merge branch 'polling_bwlimit' of github.com:fzzf678/tidb into pollin…
fzzf678 Nov 25, 2024
48f1831
Merge remote-tracking branch 'upstream/master' into polling_bwlimit
fzzf678 Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,9 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, maxWriteSpeed, job.RealStartTS)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -776,7 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
err = executeAndClosePipeline(opCtx, pipe, job, bcCtx, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -793,7 +794,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
Expand All @@ -817,23 +818,30 @@ func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
maxWriteSpeed := job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
if maxWriteSpeed != bcCtx.GetLocalBackend().GetLimiterSpeed() {
bcCtx.GetLocalBackend().UpdateLimiter(maxWriteSpeed)
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("max write speed", maxWriteSpeed))
}

concurrency := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
if int32(targetReaderCnt) != currentReaderCnt || int32(targetWriterCnt) != currentWriterCnt {
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, bcCtx ingest.BackendCtx, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
Expand All @@ -842,7 +850,7 @@ func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job
// Adjust worker pool size dynamically.
if job != nil {
go func() {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
adjustWorkerPoolSize(ctx, pipe, job, bcCtx, avgRowSize)
}()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())),
job.RealStartTS,
)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe, nil, 0)
return executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
err = executeAndClosePipeline(opCtx, pipe, nil, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,15 @@ func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2PiB';", "the value 2PiB for max_write_speed is out of range [0, 1125899906842624]")
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// valid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 16;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 64;", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '0';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '64';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '2KB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '3MiB';", "ddl job 1 is not running")
tk.MustGetErrMsg("admin alter ddl jobs 1 max_write_speed = '4 gb';", "ddl job 1 is not running")

// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4928,6 +4928,7 @@ func initJobReorgMetaFromVariables(job *model.Job, sctx sessionctx.Context) erro
if sv, ok := sctx.GetSessionVars().GetSystemVar(variable.TiDBDDLReorgBatchSize); ok {
m.SetBatchSize(variable.TidbOptInt(sv, 0))
}
m.SetMaxWriteSpeed(int(variable.DDLReorgMaxWriteSpeed.Load()))
}
setDistTaskParam := func() error {
m.IsDistReorg = variable.EnableDistTask.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2429,7 +2429,7 @@ func checkDuplicateForUniqueIndex(ctx context.Context, t table.Table, reorgInfo
ctx := tidblogutil.WithCategory(ctx, "ddl-ingest")
if bc == nil {
bc, err = ingest.LitBackCtxMgr.Register(
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, reorgInfo.RealStartTS)
ctx, reorgInfo.ID, indexInfo.Unique, nil, discovery, reorgInfo.ReorgMeta.ResourceGroupName, 1, 0, reorgInfo.RealStartTS)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type BackendCtxMgr interface {
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
importConc int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error)
Unregister(jobID int64)
Expand Down Expand Up @@ -118,6 +119,7 @@ func (m *litBackendCtxMgr) Register(
pdSvcDiscovery pd.ServiceDiscovery,
resourceGroupName string,
concurrency int,
maxWriteSpeed int,
initTS uint64,
) (BackendCtx, error) {
bc, exist := m.Load(jobID)
Expand All @@ -136,7 +138,7 @@ func (m *litBackendCtxMgr) Register(
logutil.Logger(ctx).Error(LitErrCreateDirFail, zap.Error(err))
return nil, err
}
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency)
cfg, err := genConfig(ctx, sortPath, m.memRoot, hasUnique, resourceGroupName, concurrency, maxWriteSpeed)
if err != nil {
logutil.Logger(ctx).Warn(LitWarnConfigError, zap.Int64("job ID", jobID), zap.Error(err))
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func genConfig(
unique bool,
resourceGroup string,
concurrency int,
maxWriteSpeed int,
) (*local.BackendConfig, error) {
cfg := &local.BackendConfig{
LocalStoreDir: jobSortPath,
Expand All @@ -68,7 +69,7 @@ func genConfig(
PausePDSchedulerScope: lightning.PausePDSchedulerScopeTable,
TaskType: kvutil.ExplicitTypeDDL,
DisableAutomaticCompactions: true,
StoreWriteBWLimit: int(variable.DDLReorgMaxWriteSpeed.Load()),
StoreWriteBWLimit: maxWriteSpeed,
}
// Each backend will build a single dir in lightning dir.
if ImporterRangeConcurrencyForTest != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error) {

// Register implements BackendCtxMgr.Register interface.
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client,
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, initTS uint64) (BackendCtx, error) {
pdSvcDiscovery pd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error) {
logutil.DDLIngestLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID))
if mockCtx, ok := m.runningJobs[jobID]; ok {
return mockCtx, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,7 @@ func (w *worker) runOneJobStep(
if latestJob.IsAlterable() {
job.ReorgMeta.SetConcurrency(latestJob.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())))
job.ReorgMeta.SetBatchSize(latestJob.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
job.ReorgMeta.SetMaxWriteSpeed(latestJob.ReorgMeta.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load())))
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/executor/operate_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
Expand Down Expand Up @@ -210,6 +211,16 @@ func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminComma
job.ReorgMeta.SetBatchSize(int(cons.Value.GetInt64()))
}
job.AdminOperator = byWho
case core.AlterDDLJobMaxWriteSpeed:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
speed, err := units.RAMInBytes(cons.Value.GetString())
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
job.ReorgMeta.SetMaxWriteSpeed(int(speed))
}
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/show_ddl_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,16 @@ func showCommentsFromJob(job *model.Job) string {
if job.MayNeedReorg() {
concurrency := m.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
batchSize := m.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
maxWriteSpeed := m.GetMaxWriteSpeedOrDefault(int(variable.DDLReorgMaxWriteSpeed.Load()))
if concurrency != variable.DefTiDBDDLReorgWorkerCount {
labels = append(labels, fmt.Sprintf("thread=%d", concurrency))
}
if batchSize != variable.DefTiDBDDLReorgBatchSize {
labels = append(labels, fmt.Sprintf("batch_size=%d", batchSize))
}
if maxWriteSpeed != variable.DefTiDBDDLReorgMaxWriteSpeed {
labels = append(labels, fmt.Sprintf("max_write_speed=%d", maxWriteSpeed))
}
if m.TargetScope != "" {
labels = append(labels, fmt.Sprintf("service_scope=%s", m.TargetScope))
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/executor/show_ddl_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ func TestShowCommentsFromJob(t *testing.T) {
UseCloudStorage: true,
Concurrency: 8,
BatchSize: 1024,
MaxWriteSpeed: 1024 * 1024,
}
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024", res)
require.Equal(t, "ingest, DXF, cloud, thread=8, batch_size=1024, max_write_speed=1048576", res)

job.ReorgMeta = &model.DDLReorgMeta{
ReorgTp: model.ReorgTypeLitMerge,
IsDistReorg: true,
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed,
}
res = showCommentsFromJob(job)
require.Equal(t, "ingest, DXF, cloud", res)
Expand All @@ -90,6 +92,7 @@ func TestShowCommentsFromJob(t *testing.T) {
UseCloudStorage: true,
Concurrency: variable.DefTiDBDDLReorgWorkerCount,
BatchSize: variable.DefTiDBDDLReorgBatchSize,
MaxWriteSpeed: variable.DefTiDBDDLReorgMaxWriteSpeed,
TargetScope: "background",
}
res = showCommentsFromJob(job)
Expand Down
7 changes: 1 addition & 6 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,12 +637,7 @@ func NewBackend(
return nil, common.ErrCheckMultiIngest.Wrap(err).GenWithStackByArgs()
}

var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
writeLimiter = newStoreWriteLimiter(config.StoreWriteBWLimit)
} else {
writeLimiter = noopStoreWriteLimiter{}
}
writeLimiter := newStoreWriteLimiter(config.StoreWriteBWLimit)
local := &Backend{
pdCli: pdCli,
pdHTTPCli: pdHTTPCli,
Expand Down
Loading