Skip to content

Commit

Permalink
scheduler: reduce GetStore in hot-region-scheduler (#3870) (#3910)
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <[email protected]>

Co-authored-by: 混沌DM <[email protected]>
  • Loading branch information
ti-chi-bot and HunDunDM authored Aug 16, 2021
1 parent 1b3f93e commit 6864bb3
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 19 deletions.
42 changes: 23 additions & 19 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[readLeader],
regionRead,
Expand All @@ -204,12 +206,14 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
{ // update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[writeLeader],
regionWrite,
write, core.LeaderKind)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
storesLoads,
h.pendingSums[writePeer],
regionWrite,
Expand Down Expand Up @@ -254,6 +258,7 @@ func (h *hotScheduler) gcRegionPendings() {
// summaryStoresLoad Load information of all available stores.
// it will filtered the hot peer and calculate the current and future stat(byte/key rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
storePendings map[uint64]Influence,
storeHotPeers map[uint64][]*statistics.HotPeerStat,
Expand All @@ -266,7 +271,12 @@ func summaryStoresLoad(
allKeySum := 0.0
allCount := 0.0

for id, loads := range storesLoads {
for _, store := range stores {
id := store.GetID()
loads, ok := storesLoads[id]
if !ok {
continue
}
var byteRate, keyRate float64
switch rwTy {
case read:
Expand Down Expand Up @@ -316,6 +326,7 @@ func summaryStoresLoad(

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
}
Expand Down Expand Up @@ -563,10 +574,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
for id, detail := range bs.stLoadDetail {
if bs.cluster.GetStore(id) == nil {
log.Error("failed to get the source store", zap.Uint64("store-id", id), errs.ZapError(errs.ErrGetSourceStore))
continue
}
if len(detail.HotPeers) == 0 {
continue
}
Expand Down Expand Up @@ -701,12 +708,9 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {
func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
var (
filters []filter.Filter
candidates []*core.StoreInfo
candidates []*storeLoadDetail
)
srcStore := bs.cluster.GetStore(bs.cur.srcStoreID)
if srcStore == nil {
return nil
}
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand All @@ -716,8 +720,8 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filter.NewPlacementSafeguard(bs.sche.GetName(), bs.cluster, bs.cur.region, srcStore),
}

for storeID := range bs.stLoadDetail {
candidates = append(candidates, bs.cluster.GetStore(storeID))
for _, detail := range bs.stLoadDetail {
candidates = append(candidates, detail)
}

case transferLeader:
Expand All @@ -729,9 +733,9 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters = append(filters, leaderFilter)
}

for _, store := range bs.cluster.GetFollowerStores(bs.cur.region) {
if _, ok := bs.stLoadDetail[store.GetID()]; ok {
candidates = append(candidates, store)
for _, peer := range bs.cur.region.GetFollowers() {
if detail, ok := bs.stLoadDetail[peer.GetStoreId()]; ok {
candidates = append(candidates, detail)
}
}

Expand All @@ -741,15 +745,15 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
return bs.pickDstStores(filters, candidates)
}

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*core.StoreInfo) map[uint64]*storeLoadDetail {
func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
for _, store := range candidates {
for _, detail := range candidates {
store := detail.Store
if filter.Target(bs.cluster.GetOpts(), store, filters) {
detail := bs.stLoadDetail[store.GetID()]
if detail.LoadPred.max().ByteRate*dstToleranceRatio < detail.LoadPred.Expect.ByteRate &&
detail.LoadPred.max().KeyRate*dstToleranceRatio < detail.LoadPred.Expect.KeyRate {
ret[store.GetID()] = bs.stLoadDetail[store.GetID()]
ret[store.GetID()] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(store.GetID(), 10)).Inc()
}
hotSchedulerResultCounter.WithLabelValues("dst-store-fail", strconv.FormatUint(store.GetID(), 10)).Inc()
Expand Down
3 changes: 3 additions & 0 deletions server/schedulers/shuffle_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,20 @@ func (s *shuffleHotRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Op
}

func (s *shuffleHotRegionScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Operator {
stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()
switch typ {
case read:
s.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]Influence{},
cluster.RegionReadStats(),
read, core.LeaderKind)
return s.randomSchedule(cluster, s.stLoadInfos[readLeader])
case write:
s.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
storesLoads,
map[uint64]Influence{},
cluster.RegionWriteStats(),
Expand Down
1 change: 1 addition & 0 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func maxLoad(a, b *storeLoad) *storeLoad {
}

type storeLoadDetail struct {
Store *core.StoreInfo
LoadPred *storeLoadPred
HotPeers []*statistics.HotPeerStat
}
Expand Down

0 comments on commit 6864bb3

Please sign in to comment.