Skip to content

Commit

Permalink
tp: prioritize schedulers (#6027)
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
overvenus committed Jun 24, 2022
1 parent 42d90c5 commit 5dacd28
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 50 deletions.
29 changes: 11 additions & 18 deletions cdc/scheduler/internal/tp/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,17 @@ type scheduler interface {
) []*scheduleTask
}

type schedulerType int
// schedulerPriority is the priority of each scheduler.
// Lower value has higher priority.
type schedulerPriority int

const (
schedulerTypeBasic schedulerType = 0
schedulerTypeBalance schedulerType = 1
schedulerTypeMoveTable schedulerType = 2
schedulerTypeRebalance schedulerType = 3
schedulerTypeDrainCapture schedulerType = 4
// schedulerPriorityBasic has the highest priority.
schedulerPriorityBasic schedulerPriority = iota
// schedulerPriorityDrainCapture has higher priority than other schedulers.
schedulerPriorityDrainCapture
schedulerPriorityMoveTable
schedulerPriorityRebalance
schedulerPriorityBalance
schedulerPriorityMax
)

var schedulerTypeName = map[int]string{
0: "basic-scheduler",
1: "balance-scheduler",
2: "move-table-scheduler",
3: "rebalance-scheduler",
4: "drain-capture-scheduler",
}

func (s schedulerType) String() string {
return schedulerTypeName[int(s)]
}
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/scheduler_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newBalanceScheduler(interval time.Duration) *balanceScheduler {
}

func (b *balanceScheduler) Name() string {
return schedulerTypeBalance.String()
return "balance-scheduler"
}

func (b *balanceScheduler) Schedule(
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/scheduler_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newBasicScheduler() *basicScheduler {
}

func (b *basicScheduler) Name() string {
return schedulerTypeBasic.String()
return "basic-scheduler"
}

func (b *basicScheduler) Schedule(
Expand Down
11 changes: 11 additions & 0 deletions cdc/scheduler/internal/tp/scheduler_basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func TestSchedulerBasic(t *testing.T) {
require.Equal(t, tasks[0].burstBalance.RemoveTables[0].TableID, model.TableID(5))
}

func TestSchedulerPriority(t *testing.T) {
t.Parallel()

// The basic scheduler have the highest priority.
require.Less(t, schedulerPriorityBasic, schedulerPriorityDrainCapture)
require.Less(t, schedulerPriorityBasic, schedulerPriorityBalance)
require.Less(t, schedulerPriorityBasic, schedulerPriorityMoveTable)
require.Less(t, schedulerPriorityBasic, schedulerPriorityRebalance)
require.Less(t, schedulerPriorityBasic, schedulerPriorityMax)
}

func benchmarkSchedulerBalance(
b *testing.B,
factory func(total int) (
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/scheduler_drain_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func newDrainCaptureScheduler(concurrency int) *drainCaptureScheduler {
}

func (d *drainCaptureScheduler) Name() string {
return schedulerTypeDrainCapture.String()
return "drain-capture-scheduler"
}

func (d *drainCaptureScheduler) getTarget() model.CaptureID {
Expand Down
35 changes: 13 additions & 22 deletions cdc/scheduler/internal/tp/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ import (
type schedulerManager struct {
changefeedID model.ChangeFeedID

schedulers map[schedulerType]scheduler
schedulers []scheduler
tasksCounter map[struct{ scheduler, task string }]int
}

func newSchedulerManager(changefeedID model.ChangeFeedID,
cfg *config.SchedulerConfig,
func newSchedulerManager(
changefeedID model.ChangeFeedID, cfg *config.SchedulerConfig,
) *schedulerManager {
sm := &schedulerManager{
changefeedID: changefeedID,
schedulers: make(map[schedulerType]scheduler),
schedulers: make([]scheduler, schedulerPriorityMax),
tasksCounter: make(map[struct {
scheduler string
task string
Expand All @@ -44,11 +44,11 @@ func newSchedulerManager(changefeedID model.ChangeFeedID,

balanceInterval := time.Duration(cfg.CheckBalanceInterval)

sm.schedulers[schedulerTypeBasic] = newBasicScheduler()
sm.schedulers[schedulerTypeBalance] = newBalanceScheduler(balanceInterval)
sm.schedulers[schedulerTypeMoveTable] = newMoveTableScheduler()
sm.schedulers[schedulerTypeRebalance] = newRebalanceScheduler()
sm.schedulers[schedulerTypeDrainCapture] = newDrainCaptureScheduler(cfg.MaxTaskConcurrency)
sm.schedulers[schedulerPriorityBasic] = newBasicScheduler()
sm.schedulers[schedulerPriorityDrainCapture] = newDrainCaptureScheduler(cfg.MaxTaskConcurrency)
sm.schedulers[schedulerPriorityBalance] = newBalanceScheduler(balanceInterval)
sm.schedulers[schedulerPriorityMoveTable] = newMoveTableScheduler()
sm.schedulers[schedulerPriorityRebalance] = newRebalanceScheduler()

return sm
}
Expand Down Expand Up @@ -81,10 +81,7 @@ func (sm *schedulerManager) Schedule(
}

func (sm *schedulerManager) MoveTable(tableID model.TableID, target model.CaptureID) {
scheduler, ok := sm.schedulers[schedulerTypeMoveTable]
if !ok {
log.Panic("tpscheduler: move table scheduler not found")
}
scheduler := sm.schedulers[schedulerPriorityMoveTable]
moveTableScheduler, ok := scheduler.(*moveTableScheduler)
if !ok {
log.Panic("tpscheduler: invalid move table scheduler found")
Expand All @@ -100,10 +97,7 @@ func (sm *schedulerManager) MoveTable(tableID model.TableID, target model.Captur
}

func (sm *schedulerManager) Rebalance() {
scheduler, ok := sm.schedulers[schedulerTypeRebalance]
if !ok {
log.Panic("tpscheduler: rebalance scheduler not found")
}
scheduler := sm.schedulers[schedulerPriorityRebalance]
rebalanceScheduler, ok := scheduler.(*rebalanceScheduler)
if !ok {
log.Panic("tpscheduler: invalid rebalance scheduler found")
Expand All @@ -113,10 +107,7 @@ func (sm *schedulerManager) Rebalance() {
}

func (sm *schedulerManager) DrainCapture(target model.CaptureID) bool {
scheduler, ok := sm.schedulers[schedulerTypeDrainCapture]
if !ok {
log.Panic("tpscheduler: drain capture scheduler not found")
}
scheduler := sm.schedulers[schedulerPriorityDrainCapture]
drainCaptureScheduler, ok := scheduler.(*drainCaptureScheduler)
if !ok {
log.Panic("tpscheduler: invalid drain capture scheduler found")
Expand All @@ -126,7 +117,7 @@ func (sm *schedulerManager) DrainCapture(target model.CaptureID) bool {
}

func (sm *schedulerManager) DrainingTarget() model.CaptureID {
return sm.schedulers[schedulerTypeDrainCapture].(*drainCaptureScheduler).getTarget()
return sm.schedulers[schedulerPriorityDrainCapture].(*drainCaptureScheduler).getTarget()
}

func (sm *schedulerManager) CollectMetrics() {
Expand Down
10 changes: 5 additions & 5 deletions cdc/scheduler/internal/tp/scheduler_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestNewSchedulerManager(t *testing.T) {
m := newSchedulerManager(model.DefaultChangeFeedID("test-changefeed"),
config.NewDefaultSchedulerConfig())
require.NotNil(t, m)
require.NotNil(t, m.schedulers[schedulerTypeBasic])
require.NotNil(t, m.schedulers[schedulerTypeBalance])
require.NotNil(t, m.schedulers[schedulerTypeMoveTable])
require.NotNil(t, m.schedulers[schedulerTypeRebalance])
require.NotNil(t, m.schedulers[schedulerTypeDrainCapture])
require.NotNil(t, m.schedulers[schedulerPriorityBasic])
require.NotNil(t, m.schedulers[schedulerPriorityBalance])
require.NotNil(t, m.schedulers[schedulerPriorityMoveTable])
require.NotNil(t, m.schedulers[schedulerPriorityRebalance])
require.NotNil(t, m.schedulers[schedulerPriorityDrainCapture])
}
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/scheduler_move_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newMoveTableScheduler() *moveTableScheduler {
}

func (m *moveTableScheduler) Name() string {
return schedulerTypeMoveTable.String()
return "move-table-scheduler"
}

func (m *moveTableScheduler) addTask(tableID model.TableID, target model.CaptureID) bool {
Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/tp/scheduler_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newRebalanceScheduler() *rebalanceScheduler {
}

func (r *rebalanceScheduler) Name() string {
return schedulerTypeRebalance.String()
return "rebalance-scheduler"
}

func (r *rebalanceScheduler) Schedule(
Expand Down

0 comments on commit 5dacd28

Please sign in to comment.