From 5dacd282d531f77f534074cb15d4a1d9844d8f33 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 24 Jun 2022 13:49:55 +0800 Subject: [PATCH] tp: prioritize schedulers (#6027) Signed-off-by: Neil Shen --- cdc/scheduler/internal/tp/scheduler.go | 29 ++++++--------- .../internal/tp/scheduler_balance.go | 2 +- cdc/scheduler/internal/tp/scheduler_basic.go | 2 +- .../internal/tp/scheduler_basic_test.go | 11 ++++++ .../internal/tp/scheduler_drain_capture.go | 2 +- .../internal/tp/scheduler_manager.go | 35 +++++++------------ .../internal/tp/scheduler_manager_test.go | 10 +++--- .../internal/tp/scheduler_move_table.go | 2 +- .../internal/tp/scheduler_rebalance.go | 2 +- 9 files changed, 45 insertions(+), 50 deletions(-) diff --git a/cdc/scheduler/internal/tp/scheduler.go b/cdc/scheduler/internal/tp/scheduler.go index d201ab414c7..151c7dc260d 100644 --- a/cdc/scheduler/internal/tp/scheduler.go +++ b/cdc/scheduler/internal/tp/scheduler.go @@ -27,24 +27,17 @@ type scheduler interface { ) []*scheduleTask } -type schedulerType int +// schedulerPriority is the priority of each scheduler. +// Lower value has higher priority. +type schedulerPriority int const ( - schedulerTypeBasic schedulerType = 0 - schedulerTypeBalance schedulerType = 1 - schedulerTypeMoveTable schedulerType = 2 - schedulerTypeRebalance schedulerType = 3 - schedulerTypeDrainCapture schedulerType = 4 + // schedulerPriorityBasic has the highest priority. + schedulerPriorityBasic schedulerPriority = iota + // schedulerPriorityDrainCapture has higher priority than other schedulers. + schedulerPriorityDrainCapture + schedulerPriorityMoveTable + schedulerPriorityRebalance + schedulerPriorityBalance + schedulerPriorityMax ) - -var schedulerTypeName = map[int]string{ - 0: "basic-scheduler", - 1: "balance-scheduler", - 2: "move-table-scheduler", - 3: "rebalance-scheduler", - 4: "drain-capture-scheduler", -} - -func (s schedulerType) String() string { - return schedulerTypeName[int(s)] -} diff --git a/cdc/scheduler/internal/tp/scheduler_balance.go b/cdc/scheduler/internal/tp/scheduler_balance.go index fb6df5d2718..7be7e967cde 100644 --- a/cdc/scheduler/internal/tp/scheduler_balance.go +++ b/cdc/scheduler/internal/tp/scheduler_balance.go @@ -37,7 +37,7 @@ func newBalanceScheduler(interval time.Duration) *balanceScheduler { } func (b *balanceScheduler) Name() string { - return schedulerTypeBalance.String() + return "balance-scheduler" } func (b *balanceScheduler) Schedule( diff --git a/cdc/scheduler/internal/tp/scheduler_basic.go b/cdc/scheduler/internal/tp/scheduler_basic.go index 50d7112adc5..e25ee6d1689 100644 --- a/cdc/scheduler/internal/tp/scheduler_basic.go +++ b/cdc/scheduler/internal/tp/scheduler_basic.go @@ -44,7 +44,7 @@ func newBasicScheduler() *basicScheduler { } func (b *basicScheduler) Name() string { - return schedulerTypeBasic.String() + return "basic-scheduler" } func (b *basicScheduler) Schedule( diff --git a/cdc/scheduler/internal/tp/scheduler_basic_test.go b/cdc/scheduler/internal/tp/scheduler_basic_test.go index 463096cd127..8afc292ea20 100644 --- a/cdc/scheduler/internal/tp/scheduler_basic_test.go +++ b/cdc/scheduler/internal/tp/scheduler_basic_test.go @@ -85,6 +85,17 @@ func TestSchedulerBasic(t *testing.T) { require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5)) } +func TestSchedulerPriority(t *testing.T) { + t.Parallel() + + // The basic scheduler have the highest priority. + require.Less(t, schedulerPriorityBasic, schedulerPriorityDrainCapture) + require.Less(t, schedulerPriorityBasic, schedulerPriorityBalance) + require.Less(t, schedulerPriorityBasic, schedulerPriorityMoveTable) + require.Less(t, schedulerPriorityBasic, schedulerPriorityRebalance) + require.Less(t, schedulerPriorityBasic, schedulerPriorityMax) +} + func benchmarkSchedulerBalance( b *testing.B, factory func(total int) ( diff --git a/cdc/scheduler/internal/tp/scheduler_drain_capture.go b/cdc/scheduler/internal/tp/scheduler_drain_capture.go index 70843e88ecf..3474af862c7 100644 --- a/cdc/scheduler/internal/tp/scheduler_drain_capture.go +++ b/cdc/scheduler/internal/tp/scheduler_drain_capture.go @@ -46,7 +46,7 @@ func newDrainCaptureScheduler(concurrency int) *drainCaptureScheduler { } func (d *drainCaptureScheduler) Name() string { - return schedulerTypeDrainCapture.String() + return "drain-capture-scheduler" } func (d *drainCaptureScheduler) getTarget() model.CaptureID { diff --git a/cdc/scheduler/internal/tp/scheduler_manager.go b/cdc/scheduler/internal/tp/scheduler_manager.go index 8fa2c2398bc..9f7c54983a7 100644 --- a/cdc/scheduler/internal/tp/scheduler_manager.go +++ b/cdc/scheduler/internal/tp/scheduler_manager.go @@ -26,16 +26,16 @@ import ( type schedulerManager struct { changefeedID model.ChangeFeedID - schedulers map[schedulerType]scheduler + schedulers []scheduler tasksCounter map[struct{ scheduler, task string }]int } -func newSchedulerManager(changefeedID model.ChangeFeedID, - cfg *config.SchedulerConfig, +func newSchedulerManager( + changefeedID model.ChangeFeedID, cfg *config.SchedulerConfig, ) *schedulerManager { sm := &schedulerManager{ changefeedID: changefeedID, - schedulers: make(map[schedulerType]scheduler), + schedulers: make([]scheduler, schedulerPriorityMax), tasksCounter: make(map[struct { scheduler string task string @@ -44,11 +44,11 @@ func newSchedulerManager(changefeedID model.ChangeFeedID, balanceInterval := time.Duration(cfg.CheckBalanceInterval) - sm.schedulers[schedulerTypeBasic] = newBasicScheduler() - sm.schedulers[schedulerTypeBalance] = newBalanceScheduler(balanceInterval) - sm.schedulers[schedulerTypeMoveTable] = newMoveTableScheduler() - sm.schedulers[schedulerTypeRebalance] = newRebalanceScheduler() - sm.schedulers[schedulerTypeDrainCapture] = newDrainCaptureScheduler(cfg.MaxTaskConcurrency) + sm.schedulers[schedulerPriorityBasic] = newBasicScheduler() + sm.schedulers[schedulerPriorityDrainCapture] = newDrainCaptureScheduler(cfg.MaxTaskConcurrency) + sm.schedulers[schedulerPriorityBalance] = newBalanceScheduler(balanceInterval) + sm.schedulers[schedulerPriorityMoveTable] = newMoveTableScheduler() + sm.schedulers[schedulerPriorityRebalance] = newRebalanceScheduler() return sm } @@ -81,10 +81,7 @@ func (sm *schedulerManager) Schedule( } func (sm *schedulerManager) MoveTable(tableID model.TableID, target model.CaptureID) { - scheduler, ok := sm.schedulers[schedulerTypeMoveTable] - if !ok { - log.Panic("tpscheduler: move table scheduler not found") - } + scheduler := sm.schedulers[schedulerPriorityMoveTable] moveTableScheduler, ok := scheduler.(*moveTableScheduler) if !ok { log.Panic("tpscheduler: invalid move table scheduler found") @@ -100,10 +97,7 @@ func (sm *schedulerManager) MoveTable(tableID model.TableID, target model.Captur } func (sm *schedulerManager) Rebalance() { - scheduler, ok := sm.schedulers[schedulerTypeRebalance] - if !ok { - log.Panic("tpscheduler: rebalance scheduler not found") - } + scheduler := sm.schedulers[schedulerPriorityRebalance] rebalanceScheduler, ok := scheduler.(*rebalanceScheduler) if !ok { log.Panic("tpscheduler: invalid rebalance scheduler found") @@ -113,10 +107,7 @@ func (sm *schedulerManager) Rebalance() { } func (sm *schedulerManager) DrainCapture(target model.CaptureID) bool { - scheduler, ok := sm.schedulers[schedulerTypeDrainCapture] - if !ok { - log.Panic("tpscheduler: drain capture scheduler not found") - } + scheduler := sm.schedulers[schedulerPriorityDrainCapture] drainCaptureScheduler, ok := scheduler.(*drainCaptureScheduler) if !ok { log.Panic("tpscheduler: invalid drain capture scheduler found") @@ -126,7 +117,7 @@ func (sm *schedulerManager) DrainCapture(target model.CaptureID) bool { } func (sm *schedulerManager) DrainingTarget() model.CaptureID { - return sm.schedulers[schedulerTypeDrainCapture].(*drainCaptureScheduler).getTarget() + return sm.schedulers[schedulerPriorityDrainCapture].(*drainCaptureScheduler).getTarget() } func (sm *schedulerManager) CollectMetrics() { diff --git a/cdc/scheduler/internal/tp/scheduler_manager_test.go b/cdc/scheduler/internal/tp/scheduler_manager_test.go index 0c9dc9a6a22..6e524082b6c 100644 --- a/cdc/scheduler/internal/tp/scheduler_manager_test.go +++ b/cdc/scheduler/internal/tp/scheduler_manager_test.go @@ -27,9 +27,9 @@ func TestNewSchedulerManager(t *testing.T) { m := newSchedulerManager(model.DefaultChangeFeedID("test-changefeed"), config.NewDefaultSchedulerConfig()) require.NotNil(t, m) - require.NotNil(t, m.schedulers[schedulerTypeBasic]) - require.NotNil(t, m.schedulers[schedulerTypeBalance]) - require.NotNil(t, m.schedulers[schedulerTypeMoveTable]) - require.NotNil(t, m.schedulers[schedulerTypeRebalance]) - require.NotNil(t, m.schedulers[schedulerTypeDrainCapture]) + require.NotNil(t, m.schedulers[schedulerPriorityBasic]) + require.NotNil(t, m.schedulers[schedulerPriorityBalance]) + require.NotNil(t, m.schedulers[schedulerPriorityMoveTable]) + require.NotNil(t, m.schedulers[schedulerPriorityRebalance]) + require.NotNil(t, m.schedulers[schedulerPriorityDrainCapture]) } diff --git a/cdc/scheduler/internal/tp/scheduler_move_table.go b/cdc/scheduler/internal/tp/scheduler_move_table.go index f5808998095..3daa9b9438f 100644 --- a/cdc/scheduler/internal/tp/scheduler_move_table.go +++ b/cdc/scheduler/internal/tp/scheduler_move_table.go @@ -35,7 +35,7 @@ func newMoveTableScheduler() *moveTableScheduler { } func (m *moveTableScheduler) Name() string { - return schedulerTypeMoveTable.String() + return "move-table-scheduler" } func (m *moveTableScheduler) addTask(tableID model.TableID, target model.CaptureID) bool { diff --git a/cdc/scheduler/internal/tp/scheduler_rebalance.go b/cdc/scheduler/internal/tp/scheduler_rebalance.go index 193f74d299d..6ebc3ebd5a9 100644 --- a/cdc/scheduler/internal/tp/scheduler_rebalance.go +++ b/cdc/scheduler/internal/tp/scheduler_rebalance.go @@ -39,7 +39,7 @@ func newRebalanceScheduler() *rebalanceScheduler { } func (r *rebalanceScheduler) Name() string { - return schedulerTypeRebalance.String() + return "rebalance-scheduler" } func (r *rebalanceScheduler) Schedule(