Skip to content

Commit

Permalink
scheduler: balance-hot-region-scheduler supports TiFlash (tikv#3900)
Browse files Browse the repository at this point in the history
Signed-off-by: HunDunDM <[email protected]>
  • Loading branch information
HunDunDM committed Aug 10, 2021
1 parent e0df447 commit 4c54226
Show file tree
Hide file tree
Showing 12 changed files with 704 additions and 291 deletions.
56 changes: 36 additions & 20 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,15 @@ func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.Regio
}

// LoadRegion puts region info without leader
func (mc *Cluster) LoadRegion(regionID uint64, followerIds ...uint64) {
func (mc *Cluster) LoadRegion(regionID uint64, peerStoreIDs ...uint64) {
// regions load from etcd will have no leader
r := mc.newMockRegionInfo(regionID, 0, followerIds...).Clone(core.WithLeader(nil))
r := mc.newMockRegionInfo(regionID, 0, peerStoreIDs...).Clone(core.WithLeader(nil))
mc.PutRegion(r)
}

// GetStoresLoads gets stores load statistics.
func (mc *Cluster) GetStoresLoads() map[uint64][]float64 {
mc.HotStat.FilterUnhealthyStore(mc)
return mc.HotStat.GetStoresLoads()
}

Expand Down Expand Up @@ -304,8 +305,8 @@ func (mc *Cluster) AddLabelsStore(storeID uint64, regionCount int, labels map[st
}

// AddLeaderRegion adds region with specified leader and followers.
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, followerStoreIDs...)
func (mc *Cluster) AddLeaderRegion(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
origin := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
region := origin.Clone(core.SetApproximateSize(defaultRegionSize/mb), core.SetApproximateKeys(10))
mc.PutRegion(region)
return region
Expand All @@ -320,8 +321,8 @@ func (mc *Cluster) AddRegionWithLearner(regionID uint64, leaderStoreID uint64, f
}

// AddLeaderRegionWithRange adds region with specified leader, followers and key range.
func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderID uint64, followerIds ...uint64) {
o := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, endKey string, leaderStoreID uint64, otherPeerStoreIDs ...uint64) {
o := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r := o.Clone(
core.WithStartKey([]byte(startKey)),
core.WithEndKey([]byte(endKey)),
Expand All @@ -331,11 +332,11 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en

// AddRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddRegionWithReadInfo(
regionID uint64, leaderID uint64,
regionID uint64, leaderStoreID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand All @@ -356,9 +357,9 @@ func (mc *Cluster) AddRegionWithReadInfo(
}

// AddRegionWithPeerReadInfo adds region with specified peer read info.
func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderID, targetStoreID, readBytes, readKeys, reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStoreID, readBytes, readKeys, reportInterval uint64,
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
Expand All @@ -379,11 +380,11 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderID, targetStoreID,

// AddRegionLeaderWithReadInfo add region leader read info
func (mc *Cluster) AddRegionLeaderWithReadInfo(
regionID uint64, leaderID uint64,
regionID uint64, leaderStoreID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand All @@ -405,11 +406,11 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(

// AddLeaderRegionWithWriteInfo adds region with specified leader and peers write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
regionID uint64, leaderStoreID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
Expand Down Expand Up @@ -623,8 +624,17 @@ func (mc *Cluster) UpdateStoreStatus(id uint64) {
mc.PutStore(newStore)
}

func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, followerStoreIDs ...uint64) *core.RegionInfo {
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, []uint64{}, nil)
func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, otherPeerStoreIDs ...uint64) *core.RegionInfo {
var followerStoreIDs []uint64
var learnerStoreIDs []uint64
for _, storeID := range otherPeerStoreIDs {
if store := mc.GetStore(storeID); store != nil && core.IsTiFlashStore(store.GetMeta()) {
learnerStoreIDs = append(learnerStoreIDs, storeID)
} else {
followerStoreIDs = append(followerStoreIDs, storeID)
}
}
return mc.MockRegionInfo(regionID, leaderStoreID, followerStoreIDs, learnerStoreIDs, nil)
}

// CheckLabelProperty checks label property.
Expand Down Expand Up @@ -797,3 +807,9 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.
}
return items
}

// ObserveRegionsStats records the current stores stats from region stats.
func (mc *Cluster) ObserveRegionsStats() {
storeIDs, writeBytesRates, writeKeysRates := mc.BasicCluster.GetStoresWriteRate()
mc.HotStat.ObserveRegionsStats(storeIDs, writeBytesRates, writeKeysRates)
}
11 changes: 6 additions & 5 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,12 @@ const (

defaultLeaderPriorityCheckInterval = time.Minute

defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"
defaultUseRegionStorage = true
defaultTraceRegionFlow = true
defaultFlowRoundByDigit = 3 // KB
maxTraceFlowRoundByDigit = 5 // 0.1 MB
defaultMaxResetTSGap = 24 * time.Hour
defaultKeyType = "table"

defaultStrictlyMatchLabel = false
defaultEnablePlacementRules = true
Expand Down
6 changes: 6 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ func (o *PersistOptions) IsUseJointConsensus() bool {
return o.GetScheduleConfig().EnableJointConsensus
}

// IsTraceRegionFlow returns if the region flow is tracing.
// If the accuracy cannot reach 0.1 MB, it is considered not.
func (o *PersistOptions) IsTraceRegionFlow() bool {
return o.GetPDServerConfig().FlowRoundByDigit <= maxTraceFlowRoundByDigit
}

// GetHotRegionCacheHitsThreshold is a threshold to decide if a region is hot.
func (o *PersistOptions) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
Expand Down
7 changes: 7 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ const (
gb = 1 << 30 // 1GB size
initialMaxRegionCounts = 30 // exclude storage Threshold Filter when region less than 30
initialMinSpace = 1 << 33 // 2^33=8GB

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
// EngineTiFlash is the tiflash value of the engine label.
EngineTiFlash = "tiflash"
// EngineTiKV indicates the tikv engine in metrics
EngineTiKV = "tikv"
)

// StoreInfo contains information about a store.
Expand Down
11 changes: 2 additions & 9 deletions server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ type ordinaryEngineFilter struct {
func NewOrdinaryEngineFilter(scope string) Filter {
return &ordinaryEngineFilter{
scope: scope,
constraint: placement.LabelConstraint{Key: "engine", Op: "notIn", Values: allSpeicalEngines},
constraint: placement.LabelConstraint{Key: "engine", Op: "notIn", Values: allSpecialEngines},
}
}

Expand Down Expand Up @@ -680,17 +680,10 @@ const (
SpecialUseHotRegion = "hotRegion"
// SpecialUseReserved is the reserved value of special use label
SpecialUseReserved = "reserved"

// EngineKey is the label key used to indicate engine.
EngineKey = "engine"
// EngineTiFlash is the tiflash value of the engine label.
EngineTiFlash = "tiflash"
// EngineTiKV indicates the tikv engine in metrics
EngineTiKV = "tikv"
)

var allSpecialUses = []string{SpecialUseHotRegion, SpecialUseReserved}
var allSpeicalEngines = []string{EngineTiFlash}
var allSpecialEngines = []string{core.EngineTiFlash}

type isolationFilter struct {
scope string
Expand Down
10 changes: 5 additions & 5 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
if ordinaryFilter.Target(r.cluster.GetOpts(), store) {
ordinaryPeers[peer.GetId()] = peer
} else {
engine := store.GetLabelValue(filter.EngineKey)
engine := store.GetLabelValue(core.EngineKey)
if _, ok := specialPeers[engine]; !ok {
specialPeers[engine] = make(map[uint64]*metapb.Peer)
}
Expand Down Expand Up @@ -406,7 +406,7 @@ func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[ui
leaderCandidateStores := make([]uint64, 0)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
engine := store.GetLabelValue(filter.EngineKey)
engine := store.GetLabelValue(core.EngineKey)
if len(engine) < 1 {
leaderCandidateStores = append(leaderCandidateStores, storeID)
}
Expand Down Expand Up @@ -435,9 +435,9 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", storeID),
fmt.Sprintf("%v", false),
filter.EngineTiKV).Inc()
core.EngineTiKV).Inc()
} else {
engine := store.GetLabelValue(filter.EngineKey)
engine := store.GetLabelValue(core.EngineKey)
r.specialEngines[engine].selectedPeer.Put(storeID, group)
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", storeID),
Expand All @@ -449,5 +449,5 @@ func (r *RegionScatterer) Put(peers map[uint64]*metapb.Peer, leaderStoreID uint6
scatterDistributionCounter.WithLabelValues(
fmt.Sprintf("%v", leaderStoreID),
fmt.Sprintf("%v", true),
filter.EngineTiKV).Inc()
core.EngineTiKV).Inc()
}
Loading

0 comments on commit 4c54226

Please sign in to comment.