Skip to content

Commit

Permalink
scheduler: consider evict leader when calc expect (#3967) (#3976)
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
ti-chi-bot and lhy1024 authored Sep 14, 2021
1 parent 2d1a00a commit bda68d5
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresLoads gets stores load statistics.
func (mc *Cluster) GetStoresLoads() map[uint64][]float64 {
mc.HotStat.FilterUnhealthyStore(mc)
return mc.HotStat.GetStoresLoads()
}

Expand Down Expand Up @@ -437,6 +438,16 @@ func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.PauseLeaderTransfer()))
} else {
mc.PutStore(store.Clone(core.ResumeLeaderTransfer()))
}
}

// UpdateStoreRegionWeight updates store region weight.
func (mc *Cluster) UpdateStoreRegionWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ func summaryStoresLoad(
if !ok {
continue
}
if kind == core.LeaderKind && !store.AllowLeaderTransfer() {
continue
}
loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
Expand Down
100 changes: 100 additions & 0 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,106 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) {
}
}

func (s *testHotWriteRegionSchedulerSuite) TestExpect(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetHotRegionCacheHitsThreshold(0)
sche, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)
// Add TiKV stores 1, 2, 3, 4, 5, 6, 7(Down) with region counts 3, 3, 2, 2, 0, 0, 0.
storeCount := uint64(7)
downStoreID := uint64(7)
tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 3, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.AddLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"})
tc.AddLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"})
tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"})
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, 0)
}
}

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512 KB |
//| 2 | 1 | 3 | 4 | 512 KB |
//| 3 | 1 | 2 | 4 | 512 KB |
//| 4 | 2 | | | 100 B |
// Region 1, 2 and 3 are hot regions.
testRegions := []testRegionInfo{
{1, []uint64{1, 2, 3}, 512 * KB, 5 * KB},
{2, []uint64{1, 3, 4}, 512 * KB, 5 * KB},
{3, []uint64{1, 2, 4}, 512 * KB, 5 * KB},
{4, []uint64{2}, 100, 1},
}
addRegionInfo(tc, write, testRegions)
regionBytesSum := 0.0
regionKeysSum := 0.0
hotRegionBytesSum := 0.0
hotRegionKeysSum := 0.0
for _, r := range testRegions {
regionBytesSum += r.byteRate
regionKeysSum += r.keyRate
}
for _, r := range testRegions[0:3] {
hotRegionBytesSum += r.byteRate
hotRegionKeysSum += r.keyRate
}
for i := 0; i < 20; i++ {
hb.clearPendingInfluence()
op := hb.Schedule(tc)[0]
testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1)
}
//| store_id | write_bytes_rate |
//|----------|------------------|
//| 1 | 7.5MB |
//| 2 | 4.5MB |
//| 3 | 4.5MB |
//| 4 | 6MB |
//| 5 | 0MB(Evict)|
//| 6 | 0MB |
//| 7 | n/a (Down)|
storesBytes := map[uint64]uint64{
1: 7.5 * MB * statistics.StoreHeartBeatReportInterval,
2: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
3: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
4: 6 * MB * statistics.StoreHeartBeatReportInterval,
}
tc.SetStoreEvictLeader(5, true)
tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0
for i := uint64(1); i <= storeCount; i++ {
tikvBytesSum += float64(storesBytes[i]) / 10
tikvKeysSum += float64(storesBytes[i]/100) / 10
tikvQuerySum += float64(storesBytes[i]/100) / 10
}
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, storesBytes[i])
}
}
{ // Check the load expect
aliveTiKVCount := storeCount
allowLeaderTiKVCount := aliveTiKVCount - 2 // store 5 with evict leader, store 7 is down
c.Assert(len(hb.Schedule(tc)) == 0, IsFalse)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads[statistics.ByteDim],
hotRegionBytesSum/float64(allowLeaderTiKVCount)),
IsTrue)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Expect.Loads[statistics.KeyDim],
hotRegionKeysSum/float64(allowLeaderTiKVCount)),
IsTrue)
}
}

func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit bda68d5

Please sign in to comment.