Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Sep 6, 2021
1 parent 38aaf9a commit 7f5012a
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {

// GetStoresStats gets stores statistics.
func (mc *Cluster) GetStoresStats() *statistics.StoresStats {
mc.StoresStats.FilterUnhealthyStore(mc)
return mc.StoresStats
}

Expand Down Expand Up @@ -515,6 +516,16 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

// SetStoreEvictLeader set store whether evict leader.
func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) {
store := mc.GetStore(storeID)
if enableEvictLeader {
mc.PutStore(store.Clone(core.SetStoreBlock()))
} else {
mc.PutStore(store.Clone(core.SetStoreUnBlock()))
}
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
}
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func summaryStoresLoad(
if !ok {
continue
}
if kind == core.LeaderKind && store.IsBlocked() {
continue
}
keyRate := storeKeyRate[id]

// Find all hot peers first
Expand Down
106 changes: 106 additions & 0 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,112 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) {
hb.(*hotScheduler).clearPendingInfluence()
}

func (s *testHotWriteRegionSchedulerSuite) TestExpect(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statistics.Denoising = false
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
opt.HotRegionCacheHitsThreshold = 0
sche, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil)
c.Assert(err, IsNil)
hb := sche.(*hotScheduler)
// Add TiKV stores 1, 2, 3, 4, 5, 6, 7(Down) with region counts 3, 3, 2, 2, 0, 0, 0.
storeCount := uint64(7)
downStoreID := uint64(7)
tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 3, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.AddLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"})
tc.AddLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"})
tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"})
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, 0)
}
}

//| region_id | leader_store | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512 KB |
//| 2 | 1 | 3 | 4 | 512 KB |
//| 3 | 1 | 2 | 4 | 512 KB |
//| 4 | 2 | | | 100 B |
// Region 1, 2 and 3 are hot regions.
testRegions := []testRegionInfo{
{1, []uint64{1, 2, 3}, 512 * KB, 5 * KB},
{2, []uint64{1, 3, 4}, 512 * KB, 5 * KB},
{3, []uint64{1, 2, 4}, 512 * KB, 5 * KB},
{4, []uint64{2}, 100, 1},
}
addRegionInfo(tc, write, testRegions)
regionBytesSum := 0.0
regionKeysSum := 0.0
hotRegionBytesSum := 0.0
hotRegionKeysSum := 0.0
for _, r := range testRegions {
regionBytesSum += r.byteRate
regionKeysSum += r.keyRate
}
for _, r := range testRegions[0:3] {
hotRegionBytesSum += r.byteRate
hotRegionKeysSum += r.keyRate
}
for i := 0; i < 20; i++ {
hb.clearPendingInfluence()
op := hb.Schedule(tc)[0]
testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1)
}
//| store_id | write_bytes_rate |
//|----------|------------------|
//| 1 | 7.5MB |
//| 2 | 4.5MB |
//| 3 | 4.5MB |
//| 4 | 6MB |
//| 5 | 0MB(Evict)|
//| 6 | 0MB |
//| 7 | n/a (Down)|
storesBytes := map[uint64]uint64{
1: 7.5 * MB * statistics.StoreHeartBeatReportInterval,
2: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
3: 4.5 * MB * statistics.StoreHeartBeatReportInterval,
4: 6 * MB * statistics.StoreHeartBeatReportInterval,
}
tc.SetStoreEvictLeader(5, true)
tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0
for i := uint64(1); i <= storeCount; i++ {
tikvBytesSum += float64(storesBytes[i]) / 10
tikvKeysSum += float64(storesBytes[i]/100) / 10
tikvQuerySum += float64(storesBytes[i]/100) / 10
}
for i := uint64(1); i <= storeCount; i++ {
if i != downStoreID {
tc.UpdateStorageWrittenBytes(i, storesBytes[i])
}
}
{ // Check the load expect
aliveTiKVCount := storeCount
allowLeaderTiKVCount := aliveTiKVCount - 2 // store 5 with evict leader, store 7 is down
c.Assert(len(hb.Schedule(tc)) == 0, IsFalse)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpByteRate,
hotRegionBytesSum/float64(allowLeaderTiKVCount)),
IsTrue)
c.Assert(nearlyAbout(
hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpKeyRate,
hotRegionKeysSum/float64(allowLeaderTiKVCount)),
IsTrue)
}
}

func nearlyAbout(f1, f2 float64) bool {
if f1-f2 < 0.1*KB || f2-f1 < 0.1*KB {
return true
}
return false
}

func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 7f5012a

Please sign in to comment.