Skip to content

Commit

Permalink
scheduler: balance-hot-region-scheduler supports TiFlash
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <[email protected]>
  • Loading branch information
HunDunDM committed Aug 3, 2021
1 parent c06414f commit ef48dc9
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 207 deletions.
232 changes: 66 additions & 166 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ type hotScheduler struct {
// be selected if its owner region is tracked in this attribute.
regionPendings map[uint64]*pendingInfluence

// store information, including pending Influence by resource type
// Every time Schedule will recalculate it.
stInfos map[uint64]*storeInfo
// temporary states but exported to API or metrics
// Every time Schedule will recalculate it.
stLoadInfos [resourceTypeLen]map[uint64]*storeLoadDetail
// This stores the pending Influence for each store by resource type.
pendingSums map[uint64]*Influence

// config of hot scheduler
conf *hotRegionSchedulerConfig
}
Expand Down Expand Up @@ -155,40 +158,34 @@ func (h *hotScheduler) dispatch(typ rwType, cluster opt.Cluster) []*operator.Ope
// prepareForBalance calculate the summary of pending Influence for each store and prepare the load detail for
// each store
func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
h.stInfos = summaryStoreInfos(cluster)
h.summaryPendingInfluence()

stores := cluster.GetStores()
storesLoads := cluster.GetStoresLoads()

{ // update read statistics
regionRead := cluster.RegionReadStats()
h.stLoadInfos[readLeader] = summaryStoresLoad(
stores,
h.stInfos,
storesLoads,
h.pendingSums,
regionRead,
read, core.LeaderKind)
h.stLoadInfos[readPeer] = summaryStoresLoad(
stores,
h.stInfos,
storesLoads,
h.pendingSums,
regionRead,
read, core.RegionKind)
}

{ // update write statistics
regionWrite := cluster.RegionWriteStats()
h.stLoadInfos[writeLeader] = summaryStoresLoad(
stores,
h.stInfos,
storesLoads,
h.pendingSums,
regionWrite,
write, core.LeaderKind)

h.stLoadInfos[writePeer] = summaryStoresLoad(
stores,
h.stInfos,
storesLoads,
h.pendingSums,
regionWrite,
write, core.RegionKind)
}
Expand All @@ -198,10 +195,18 @@ func (h *hotScheduler) prepareForBalance(cluster opt.Cluster) {
// and clean the region from regionInfluence if they have ended operator.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *hotScheduler) summaryPendingInfluence() {
maxZombieDur := h.conf.GetMaxZombieDuration()
ret := make(map[uint64]*Influence)
storeZombieDur := h.conf.GetStoreStatZombieDuration()
regionsZombieDur := h.conf.GetRegionsStatZombieDuration()

for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := storeZombieDur
if (from != nil && from.IsTiFlash) || (to != nil && to.IsTiFlash) {
maxZombieDur = regionsZombieDur
}
weight, needGC := h.calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Dec()
Expand All @@ -212,145 +217,14 @@ func (h *hotScheduler) summaryPendingInfluence() {
zap.Duration("zombie", maxZombieDur))
continue
}
if _, ok := ret[p.to]; !ok {
ret[p.to] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.to] = ret[p.to].add(&p.origin, weight)
if _, ok := ret[p.from]; !ok {
ret[p.from] = &Influence{Loads: make([]float64, len(p.origin.Loads))}
}
ret[p.from] = ret[p.from].add(&p.origin, -weight)
}
h.pendingSums = ret
}

// summaryStoresLoad Load information of all available stores.
// it will filter the hot peer and calculate the current and future stat(rate,count) for each store
func summaryStoresLoad(
stores []*core.StoreInfo,
storesLoads map[uint64][]float64,
storePendings map[uint64]*Influence,
storeHotPeers map[uint64][]*statistics.HotPeerStat,
rwTy rwType,
kind core.ResourceKind,
) map[uint64]*storeLoadDetail {
// loadDetail stores the storeID -> hotPeers stat and its current and future stat(rate,count)
loadDetail := make(map[uint64]*storeLoadDetail, len(storesLoads))
allLoadSum := make([]float64, statistics.DimLen)
allCount := 0.0

for _, store := range stores {
id := store.GetID()
storeLoads, ok := storesLoads[id]
if !ok {
continue
if from != nil && weight > 0 {
from.addInfluence(&p.origin, -weight)
}
loads := make([]float64, statistics.DimLen)
switch rwTy {
case read:
loads[statistics.ByteDim] = storeLoads[statistics.StoreReadBytes]
loads[statistics.KeyDim] = storeLoads[statistics.StoreReadKeys]
loads[statistics.QueryDim] = storeLoads[statistics.StoreReadQuery]
case write:
loads[statistics.ByteDim] = storeLoads[statistics.StoreWriteBytes]
loads[statistics.KeyDim] = storeLoads[statistics.StoreWriteKeys]
loads[statistics.QueryDim] = storeLoads[statistics.StoreWriteQuery]
}
// Find all hot peers first
var hotPeers []*statistics.HotPeerStat
{
peerLoadSum := make([]float64, statistics.DimLen)
// TODO: To remove `filterHotPeers`, we need to:
// HotLeaders consider `Write{Bytes,Keys}`, so when we schedule `writeLeader`, all peers are leader.
for _, peer := range filterHotPeers(kind, storeHotPeers[id]) {
for i := range peerLoadSum {
peerLoadSum[i] += peer.GetLoad(getRegionStatKind(rwTy, i))
}
hotPeers = append(hotPeers, peer.Clone())
}
// Use sum of hot peers to estimate leader-only byte rate.
// For write requests, Write{Bytes, Keys} is applied to all Peers at the same time, while the Leader and Follower are under different loads (usually the Leader consumes more CPU).
// But none of the current dimension reflect this difference, so we create a new dimension to reflect it.
if kind == core.LeaderKind && rwTy == write {
loads = peerLoadSum
}

// Metric for debug.
{
ty := "byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.ByteDim])
}
{
ty := "key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.KeyDim])
}
{
ty := "query-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(peerLoadSum[statistics.QueryDim])
}
}
for i := range allLoadSum {
allLoadSum[i] += loads[i]
}
allCount += float64(len(hotPeers))

// Build store load prediction from current load and pending influence.
stLoadPred := (&storeLoad{
Loads: loads,
Count: float64(len(hotPeers)),
}).ToLoadPred(rwTy, storePendings[id])

// Construct store load info.
loadDetail[id] = &storeLoadDetail{
Store: store,
LoadPred: stLoadPred,
HotPeers: hotPeers,
if to != nil && weight > 0 {
to.addInfluence(&p.origin, weight)
}
}
storeLen := float64(len(storesLoads))
// store expectation rate and count for each store-load detail.
for id, detail := range loadDetail {
expectLoads := make([]float64, len(allLoadSum))
for i := range expectLoads {
expectLoads[i] = allLoadSum[i] / storeLen
}
expectCount := allCount / storeLen
detail.LoadPred.Expect.Loads = expectLoads
detail.LoadPred.Expect.Count = expectCount
// Debug
{
ty := "exp-byte-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.ByteDim])
}
{
ty := "exp-key-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.KeyDim])
}
{
ty := "exp-query-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectLoads[statistics.QueryDim])
}
{
ty := "exp-count-rate-" + rwTy.String() + "-" + kind.String()
hotPeerSummary.WithLabelValues(ty, fmt.Sprintf("%v", id)).Set(expectCount)
}
}
return loadDetail
}

// filterHotPeers filter the peer whose hot degree is less than minHotDegress
func filterHotPeers(
kind core.ResourceKind,
peers []*statistics.HotPeerStat,
) []*statistics.HotPeerStat {
ret := make([]*statistics.HotPeerStat, 0, len(peers))
for _, peer := range peers {
if kind == core.LeaderKind && !peer.IsLeader() {
continue
}
ret = append(ret, peer)
}
return ret
}

func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl Influence) bool {
Expand Down Expand Up @@ -576,12 +450,25 @@ func (bs *balanceSolver) solve() []*operator.Operator {
// its expectation * ratio, the store would be selected as hot source store
func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail)
confSrcToleranceRatio := bs.sche.conf.GetSrcToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for id, detail := range bs.stLoadDetail {
srcToleranceRatio := confSrcToleranceRatio
if detail.Info.IsTiFlash {
if !confEnableForTiFlash {
continue
}
if bs.rwTy != write || bs.opTy != movePeer {
continue
}
srcToleranceRatio += 0.1
}
if len(detail.HotPeers) == 0 {
continue
}

minLoad := detail.LoadPred.min()
if bs.checkSrcByDimPriorityAndTolerance(minLoad, &detail.LoadPred.Expect) {
if bs.checkSrcByDimPriorityAndTolerance(minLoad, &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
Expand All @@ -591,11 +478,11 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*storeLoadDetail {
return ret
}

func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *storeLoad) bool {
func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *storeLoad, toleranceRatio float64) bool {
if bs.sche.conf.StrictPickingStore {
return slice.AllOf(minLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return minLoad.Loads[i] > bs.sche.conf.GetSrcToleranceRatio()*expectLoad.Loads[i]
return minLoad.Loads[i] > toleranceRatio*expectLoad.Loads[i]
}
return true
})
Expand Down Expand Up @@ -733,7 +620,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {
filters []filter.Filter
candidates []*storeLoadDetail
)
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Store
srcStore := bs.stLoadDetail[bs.cur.srcStoreID].Info.Store
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand Down Expand Up @@ -770,11 +657,24 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*storeLoadDetail {

func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*storeLoadDetail) map[uint64]*storeLoadDetail {
ret := make(map[uint64]*storeLoadDetail, len(candidates))
confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for _, detail := range candidates {
id := detail.Store.GetID()
maxLoad := detail.LoadPred.max()
if filter.Target(bs.cluster.GetOpts(), detail.Store, filters) {
if bs.checkDstByPriorityAndTolerance(maxLoad, &detail.LoadPred.Expect) {
store := detail.Info.Store
dstToleranceRatio := confDstToleranceRatio
if detail.Info.IsTiFlash {
if !confEnableForTiFlash {
continue
}
if bs.rwTy != write || bs.opTy != movePeer {
continue
}
dstToleranceRatio += 0.1
}
if filter.Target(bs.cluster.GetOpts(), store, filters) {
id := store.GetID()
maxLoad := detail.LoadPred.max()
if bs.checkDstByPriorityAndTolerance(maxLoad, &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
Expand All @@ -785,17 +685,16 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
return ret
}

func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLoad) bool {
dstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *storeLoad, toleranceRatio float64) bool {
if bs.sche.conf.StrictPickingStore {
return slice.AllOf(maxLoad.Loads, func(i int) bool {
if bs.isSelectedDim(i) {
return maxLoad.Loads[i]*dstToleranceRatio < expect.Loads[i]
return maxLoad.Loads[i]*toleranceRatio < expect.Loads[i]
}
return true
})
}
return maxLoad.Loads[bs.firstPriority]*dstToleranceRatio < expect.Loads[bs.firstPriority]
return maxLoad.Loads[bs.firstPriority]*toleranceRatio < expect.Loads[bs.firstPriority]
}

// calcProgressiveRank calculates `bs.cur.progressiveRank`.
Expand Down Expand Up @@ -1152,9 +1051,11 @@ func (h *hotScheduler) GetHotStatus(typ string) *statistics.StoreHotPeersInfos {
func (h *hotScheduler) GetPendingInfluence() map[uint64]*Influence {
h.RLock()
defer h.RUnlock()
ret := make(map[uint64]*Influence, len(h.pendingSums))
for id, infl := range h.pendingSums {
ret[id] = infl.add(infl, 0) // copy
ret := make(map[uint64]*Influence, len(h.stInfos))
for id, info := range h.stInfos {
if info.PendingSum != nil {
ret[id] = info.PendingSum
}
}
return ret
}
Expand Down Expand Up @@ -1184,7 +1085,6 @@ func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur
}

func (h *hotScheduler) clearPendingInfluence() {
h.pendingSums = nil
h.regionPendings = make(map[uint64]*pendingInfluence)
}

Expand Down
Loading

0 comments on commit ef48dc9

Please sign in to comment.