diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 3bd098a5b4d..1726639d156 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -75,6 +75,7 @@ func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) { // GetStoresStats gets stores statistics. func (mc *Cluster) GetStoresStats() *statistics.StoresStats { + mc.StoresStats.FilterUnhealthyStore(mc) return mc.StoresStats } @@ -515,6 +516,16 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) { mc.PutStore(newStore) } +// SetStoreEvictLeader set store whether evict leader. +func (mc *Cluster) SetStoreEvictLeader(storeID uint64, enableEvictLeader bool) { + store := mc.GetStore(storeID) + if enableEvictLeader { + mc.PutStore(store.Clone(core.SetStoreBlock())) + } else { + mc.PutStore(store.Clone(core.SetStoreUnBlock())) + } +} + func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo { return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil) } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 16042503565..8a89d4e8afb 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -317,6 +317,9 @@ func summaryStoresLoad( if !ok { continue } + if kind == core.LeaderKind && store.IsBlocked() { + continue + } keyRate := storeKeyRate[id] // Find all hot peers first diff --git a/server/schedulers/hot_test.go b/server/schedulers/hot_test.go index 08e2bc7c78d..74e98b9fa1e 100644 --- a/server/schedulers/hot_test.go +++ b/server/schedulers/hot_test.go @@ -622,6 +622,112 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { hb.(*hotScheduler).clearPendingInfluence() } +func (s *testHotWriteRegionSchedulerSuite) TestExpect(c *C) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + statistics.Denoising = false + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + opt.HotRegionCacheHitsThreshold = 0 + sche, err := schedule.CreateScheduler(HotWriteRegionType, schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + c.Assert(err, IsNil) + hb := sche.(*hotScheduler) + // Add TiKV stores 1, 2, 3, 4, 5, 6, 7(Down) with region counts 3, 3, 2, 2, 0, 0, 0. + storeCount := uint64(7) + downStoreID := uint64(7) + tc.AddLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"}) + tc.AddLabelsStore(2, 3, map[string]string{"zone": "z2", "host": "h2"}) + tc.AddLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"}) + tc.AddLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"}) + tc.AddLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"}) + tc.AddLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"}) + tc.AddLabelsStore(7, 0, map[string]string{"zone": "z5", "host": "h7"}) + for i := uint64(1); i <= storeCount; i++ { + if i != downStoreID { + tc.UpdateStorageWrittenBytes(i, 0) + } + } + + //| region_id | leader_store | follower_store | follower_store | written_bytes | + //|-----------|--------------|----------------|----------------|---------------| + //| 1 | 1 | 2 | 3 | 512 KB | + //| 2 | 1 | 3 | 4 | 512 KB | + //| 3 | 1 | 2 | 4 | 512 KB | + //| 4 | 2 | | | 100 B | + // Region 1, 2 and 3 are hot regions. + testRegions := []testRegionInfo{ + {1, []uint64{1, 2, 3}, 512 * KB, 5 * KB}, + {2, []uint64{1, 3, 4}, 512 * KB, 5 * KB}, + {3, []uint64{1, 2, 4}, 512 * KB, 5 * KB}, + {4, []uint64{2}, 100, 1}, + } + addRegionInfo(tc, write, testRegions) + regionBytesSum := 0.0 + regionKeysSum := 0.0 + hotRegionBytesSum := 0.0 + hotRegionKeysSum := 0.0 + for _, r := range testRegions { + regionBytesSum += r.byteRate + regionKeysSum += r.keyRate + } + for _, r := range testRegions[0:3] { + hotRegionBytesSum += r.byteRate + hotRegionKeysSum += r.keyRate + } + for i := 0; i < 20; i++ { + hb.clearPendingInfluence() + op := hb.Schedule(tc)[0] + testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1) + } + //| store_id | write_bytes_rate | + //|----------|------------------| + //| 1 | 7.5MB | + //| 2 | 4.5MB | + //| 3 | 4.5MB | + //| 4 | 6MB | + //| 5 | 0MB(Evict)| + //| 6 | 0MB | + //| 7 | n/a (Down)| + storesBytes := map[uint64]uint64{ + 1: 7.5 * MB * statistics.StoreHeartBeatReportInterval, + 2: 4.5 * MB * statistics.StoreHeartBeatReportInterval, + 3: 4.5 * MB * statistics.StoreHeartBeatReportInterval, + 4: 6 * MB * statistics.StoreHeartBeatReportInterval, + } + tc.SetStoreEvictLeader(5, true) + tikvBytesSum, tikvKeysSum, tikvQuerySum := 0.0, 0.0, 0.0 + for i := uint64(1); i <= storeCount; i++ { + tikvBytesSum += float64(storesBytes[i]) / 10 + tikvKeysSum += float64(storesBytes[i]/100) / 10 + tikvQuerySum += float64(storesBytes[i]/100) / 10 + } + for i := uint64(1); i <= storeCount; i++ { + if i != downStoreID { + tc.UpdateStorageWrittenBytes(i, storesBytes[i]) + } + } + { // Check the load expect + aliveTiKVCount := storeCount + allowLeaderTiKVCount := aliveTiKVCount - 2 // store 5 with evict leader, store 7 is down + c.Assert(len(hb.Schedule(tc)) == 0, IsFalse) + c.Assert(nearlyAbout( + hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpByteRate, + hotRegionBytesSum/float64(allowLeaderTiKVCount)), + IsTrue) + c.Assert(nearlyAbout( + hb.stLoadInfos[writeLeader][1].LoadPred.Current.ExpKeyRate, + hotRegionKeysSum/float64(allowLeaderTiKVCount)), + IsTrue) + } +} + +func nearlyAbout(f1, f2 float64) bool { + if f1-f2 < 0.1*KB || f2-f1 < 0.1*KB { + return true + } + return false +} + func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()