Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: consider evict leader when calc expect (#3967) #3977

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,16 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
}
}

// UpdateStoreRegionWeight updates store region weight.
func (mc *Cluster) UpdateStoreRegionWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ const (
var schedulePeerPr = 0.66

// pendingAmpFactor will amplify the impact of pending influence, making scheduling slower or even serial when two stores are close together
var pendingAmpFactor = 8.0
var pendingAmpFactor = 2.0

type hotScheduler struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/hot_region_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// KeyPriority indicates hot-region-scheduler prefer key dim
KeyPriority = "key"
// QueryPriority indicates hot-region-scheduler prefer query dim
QueryPriority = "qps"
QueryPriority = "query"

// Scheduling has a bigger impact on TiFlash, so it needs to be corrected in configuration items
// In the default config, the TiKV difference is 1.05*1.05-1 = 0.1025, and the TiFlash difference is 1.15*1.15-1 = 0.3225
Expand Down
28 changes: 15 additions & 13 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
//| 2 | 4.5MB |
//| 3 | 4.5MB |
//| 4 | 6MB |
//| 5 | 0MB |
//| 5 | 0MB(Evict)|
//| 6 | 0MB |
//| 7 | n/a (Down)|
//| 8 | n/a | <- TiFlash is always 0.
Expand All @@ -465,6 +465,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
3: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
4: 6 * MB * statistics.StoreHeartBeatReportInterval,
}
tc.SetStoreEvictLeader(5, true)
tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0
for i := aliveTiKVStartID; i <= aliveTiKVLastID; i++ {
tikvBytesSum += float64(storesBytes[i]) / 10
Expand All @@ -478,13 +479,14 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) {
}
{ // Check the load expect
aliveTiKVCount := float64(aliveTiKVLastID - aliveTiKVStartID + 1)
allowLeaderTiKVCount := aliveTiKVCount - 1 // store 5 with evict leader
aliveTiFlashCount := float64(aliveTiFlashLastID - aliveTiFlashStartID + 1)
tc.ObserveRegionsStats()
c.Assert(len(hb.Schedule(tc)) == 0, IsFalse)
c.Assert(
loadsEqual(
hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads,
[]float64{hotRegionBytesSum / aliveTiKVCount, hotRegionKeysSum / aliveTiKVCount, tikvQuerySum / aliveTiKVCount}),
[]float64{hotRegionBytesSum / allowLeaderTiKVCount, hotRegionKeysSum / allowLeaderTiKVCount, tikvQuerySum / allowLeaderTiKVCount}),
IsTrue)
c.Assert(tikvQuerySum != hotRegionQuerySum, IsTrue)
c.Assert(
Expand Down Expand Up @@ -549,7 +551,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithQuery(c *C) {
c.Assert(err, IsNil)
hb.(*hotScheduler).conf.SetSrcToleranceRatio(1)
hb.(*hotScheduler).conf.SetDstToleranceRatio(1)
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{QueryPriority, BytePriority}

tc := mockcluster.NewCluster(ctx, opt)
tc.SetHotRegionCacheHitsThreshold(0)
Expand Down Expand Up @@ -1905,9 +1907,9 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
{statistics.ByteDim, statistics.KeyDim},
})
// config error value
hb.(*hotScheduler).conf.ReadPriorities = []string{"hahaha"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"hahaha", "byte"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"}
hb.(*hotScheduler).conf.ReadPriorities = []string{"error"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"error", BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority, KeyPriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.QueryDim, statistics.ByteDim},
{statistics.KeyDim, statistics.ByteDim},
Expand All @@ -1921,18 +1923,18 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
{statistics.ByteDim, statistics.KeyDim},
})
// config byte and key
hb.(*hotScheduler).conf.ReadPriorities = []string{"key", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"byte", "key"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"key", "byte"}
hb.(*hotScheduler).conf.ReadPriorities = []string{KeyPriority, BytePriority}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{BytePriority, KeyPriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.KeyDim, statistics.ByteDim},
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
})
// config query in low version
hb.(*hotScheduler).conf.ReadPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte"}
hb.(*hotScheduler).conf.ReadPriorities = []string{QueryPriority, BytePriority}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{QueryPriority, BytePriority}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
Expand All @@ -1941,7 +1943,7 @@ func (s *testHotSchedulerSuite) TestCompatibility(c *C) {
// config error value
hb.(*hotScheduler).conf.ReadPriorities = []string{"error", "error"}
hb.(*hotScheduler).conf.WriteLeaderPriorities = []string{}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{"qps", "byte", "key"}
hb.(*hotScheduler).conf.WritePeerPriorities = []string{QueryPriority, BytePriority, KeyPriority}
checkPriority(c, hb.(*hotScheduler), tc, [3][2]int{
{statistics.ByteDim, statistics.KeyDim},
{statistics.KeyDim, statistics.ByteDim},
Expand Down
32 changes: 25 additions & 7 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ type storeCollector interface {
// Engine returns the type of Store.
Engine() string
// Filter determines whether the Store needs to be handled by itself.
Filter(info *storeSummaryInfo) bool
Filter(info *storeSummaryInfo, kind core.ResourceKind) bool
// GetLoads obtains available loads from storeLoads and peerLoadSum according to rwTy and kind.
GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64)
}
Expand All @@ -522,8 +522,17 @@ func (c tikvCollector) Engine() string {
return core.EngineTiKV
}

func (c tikvCollector) Filter(info *storeSummaryInfo) bool {
return !info.IsTiFlash
func (c tikvCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool {
if info.IsTiFlash {
return false
}
switch kind {
case core.LeaderKind:
return info.Store.AllowLeaderTransfer()
case core.RegionKind:
return true
}
return false
}

func (c tikvCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64) {
Expand Down Expand Up @@ -564,8 +573,14 @@ func (c tiflashCollector) Engine() string {
return core.EngineTiFlash
}

func (c tiflashCollector) Filter(info *storeSummaryInfo) bool {
return info.IsTiFlash
func (c tiflashCollector) Filter(info *storeSummaryInfo, kind core.ResourceKind) bool {
switch kind {
case core.LeaderKind:
return false
case core.RegionKind:
return info.IsTiFlash
}
return false
}

func (c tiflashCollector) GetLoads(storeLoads, peerLoadSum []float64, rwTy rwType, kind core.ResourceKind) (loads []float64) {
Expand Down Expand Up @@ -640,12 +655,11 @@ func summaryStoresLoadByEngine(
allStoreCount := 0
allHotPeersCount := 0

// Stores without byte rate statistics is not available to schedule.
for _, info := range storeInfos {
store := info.Store
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok || !collector.Filter(info) {
if !ok || !collector.Filter(info, kind) {
continue
}

Expand Down Expand Up @@ -691,6 +705,10 @@ func summaryStoresLoadByEngine(
})
}

if allStoreCount == 0 {
return loadDetail
}

expectCount := float64(allHotPeersCount) / float64(allStoreCount)
expectLoads := make([]float64, len(allStoreLoadSum))
for i := range expectLoads {
Expand Down
4 changes: 2 additions & 2 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
"minor-dec-ratio": 0.99,
"src-tolerance-ratio": 1.05,
"dst-tolerance-ratio": 1.05,
"read-priorities": []interface{}{"qps", "byte"},
"read-priorities": []interface{}{"query", "byte"},
"write-leader-priorities": []interface{}{"key", "byte"},
"write-peer-priorities": []interface{}{"byte", "key"},
"strict-picking-store": "true",
Expand Down Expand Up @@ -316,7 +316,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) {
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)
// cannot set qps as write-peer-priorities
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "qps,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "set", "write-peer-priorities", "query,byte"}, nil)
mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler"}, &conf1)
c.Assert(conf1, DeepEquals, expected1)

Expand Down