diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index 19ce6a53d43..a7ab67158f5 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -190,11 +190,13 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { h.summaryPendingInfluence() + stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() { // update read statistics regionRead := cluster.RegionReadStats() h.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storesLoads, h.pendingSums[readLeader], regionRead, @@ -204,12 +206,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) { { // update write statistics regionWrite := cluster.RegionWriteStats() h.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storesLoads, h.pendingSums[writeLeader], regionWrite, write, core.LeaderKind) h.stLoadInfos[writePeer] = summaryStoresLoad( + stores, storesLoads, h.pendingSums[writePeer], regionWrite, @@ -254,6 +258,7 @@ func (h *hotScheduler) gcRegionPendings() { // summaryStoresLoad Load information of all available stores. // it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store func summaryStoresLoad( + stores []*core.StoreInfo, storesLoads map[uint64][]float64, storePendings map[uint64]Influence, storeHotPeers map[uint64][]*statistics.HotPeerStat, @@ -266,7 +271,12 @@ func summaryStoresLoad( allKeySum := 0.0 allCount := 0.0 - for id, loads := range storesLoads { + for _, store := range stores { + id := store.GetID() + loads, ok := storesLoads[id] + if !ok { + continue + } var byteRate, keyRate float64 switch rwTy { case read: @@ -316,6 +326,7 @@ func summaryStoresLoad( // Construct store load info. loadDetail[id] = &storeLoadDetail{ + Store: store, LoadPred: stLoadPred, HotPeers: hotPeers, } @@ -563,10 +574,6 @@ func (bs *balanceSolver) solve() []*operator.Operator { func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail) for id, detail := range bs.stLoadDetail { - if bs.cluster.GetStore(id) == nil { - log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore)) - continue - } if len(detail.HotPeers) == 0 { continue } @@ -701,12 +708,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { var ( filters []filter.Filter - candidates []*core.StoreInfo + candidates []*storeLoadDetail ) - srcStore := bs.cluster.GetStore(bs.cur.srcStoreID) - if srcStore == nil { - return nil - } + srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store switch bs.opTy { case movePeer: filters = []filter.Filter{ @@ -716,8 +720,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore), } - for storeID := range bs.stLoadDetail { - candidates = append(candidates, bs.cluster.GetStore(storeID)) + for _, detail := range bs.stLoadDetail { + candidates = append(candidates, detail) } case transferLeader: @@ -729,9 +733,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { filters = append(filters, leaderFilter) } - for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) { - if _, ok := bs.stLoadDetail[store.GetID()]; ok { - candidates = append(candidates, store) + for _, peer := range bs.cur.region.GetFollowers() { + if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok { + candidates = append(candidates, detail) } } @@ -741,15 +745,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail { return bs.pickDstStores(filters, candidates) } -func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail { +func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail { ret := make(map[uint64]*storeLoadDetail, len(candidates)) dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() - for _, store := range candidates { + for _, detail := range candidates { + store := detail.Store if filter.Target(bs.cluster.GetOpts(), store, filters) { - detail := bs.stLoadDetail[store.GetID()] if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Expect.ByteRate && detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Expect.KeyRate { - ret[store.GetID()] = bs.stLoadDetail[store.GetID()] + ret[store.GetID()] = detail hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc() } hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc() diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index 7269d4b1190..13c3d90638a 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -132,10 +132,12 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op } func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator { + stores := cluster.GetStores() storesLoads := cluster.GetStoresLoads() switch typ { case read: s.stLoadInfos[readLeader] = summaryStoresLoad( + stores, storesLoads, map[uint64]Influence{}, cluster.RegionReadStats(), @@ -143,6 +145,7 @@ func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) [] return s.randomSchedule(cluster, s.stLoadInfos[readLeader]) case write: s.stLoadInfos[writeLeader] = summaryStoresLoad( + stores, storesLoads, map[uint64]Influence{}, cluster.RegionWriteStats(), diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index af04b267897..0abf82b9810 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -326,6 +326,7 @@ func maxLoad(a, b *storeLoad) *storeLoad { } type storeLoadDetail struct { + Store *core.StoreInfo LoadPred *storeLoadPred HotPeers []*statistics.HotPeerStat }