Skip to content

Commit

Permalink
Merge branch 'master' into tolerant-ratio2
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 7, 2021
2 parents 92ef0b3 + b5909a7 commit 4778740
Show file tree
Hide file tree
Showing 18 changed files with 497 additions and 276 deletions.
73 changes: 41 additions & 32 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,20 +573,25 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin, err := c.core.PreCheckPutRegion(region)
hotStat := c.hotStat
storage := c.storage
coreCluster := c.core
c.RUnlock()

origin, err := coreCluster.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
expiredStats := c.hotStat.ExpiredItems(region)

expiredStats := hotStat.ExpiredItems(region)
// Put expiredStats into read/write queue to update stats
if len(expiredStats) > 0 {
for _, stat := range expiredStats {
item := statistics.NewExpiredStatItem(stat)
if stat.Kind == statistics.WriteFlow {
c.hotStat.CheckWriteAsync(item)
hotStat.CheckWriteAsync(item)
} else {
c.hotStat.CheckReadAsync(item)
hotStat.CheckReadAsync(item)
}
}
}
Expand All @@ -595,9 +600,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval)
item := statistics.NewPeerInfoItem(peerInfo, region)
c.hotStat.CheckWriteAsync(item)
hotStat.CheckWriteAsync(item)
}
c.RUnlock()

// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
Expand Down Expand Up @@ -659,8 +663,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
// Once flow has changed, will update the cache.
// Because keys and bytes are strongly related, only bytes are judged.
if c.traceRegionFlow && (region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead()) {
if region.GetRoundBytesWritten() != origin.GetRoundBytesWritten() ||
region.GetRoundBytesRead() != origin.GetRoundBytesRead() {
saveCache, needSync = true, true
}

Expand All @@ -679,6 +683,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
time.Sleep(500 * time.Millisecond)
})

var overlaps []*core.RegionInfo
c.Lock()
if saveCache {
// To prevent a concurrent heartbeat of another region from overriding the up-to-date region info by a stale one,
Expand All @@ -689,17 +694,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.Unlock()
return err
}
overlaps := c.core.PutRegion(region)
if c.storage != nil {
for _, item := range overlaps {
if err := c.storage.DeleteRegion(item.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", item.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())),
errs.ZapError(err))
}
}
}
overlaps = c.core.PutRegion(region)
for _, item := range overlaps {
if c.regionStats != nil {
c.regionStats.ClearDefunctRegion(item.GetID())
Expand Down Expand Up @@ -730,24 +725,38 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if c.regionStats != nil {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

changedRegions := c.changedRegions

c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
if saveKV && c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
errs.ZapError(err))
if storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
// writes to storage in the critical area. So don't use mutex to protect it.
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
for _, item := range overlaps {
if err := storage.DeleteRegion(item.GetMeta()); err != nil {
log.Error("failed to delete region from storage",
zap.Uint64("region-id", item.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(item.GetMeta())),
errs.ZapError(err))
}
}
if saveKV {
if err := storage.SaveRegion(region.GetMeta()); err != nil {
log.Error("failed to save region to storage",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
errs.ZapError(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}
regionEventCounter.WithLabelValues("update_kv").Inc()
}

if saveKV || needSync {
select {
case c.changedRegions <- region:
case changedRegions <- region:
default:
}
}
Expand Down
13 changes: 3 additions & 10 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,6 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
processRegions(regions)
newRegion := cluster.GetRegion(region.GetID())
c.Assert(newRegion.GetBytesRead(), Equals, uint64(1000))

// do not trace the flow changes
cluster.traceRegionFlow = false
processRegions([]*core.RegionInfo{region})
newRegion = cluster.GetRegion(region.GetID())
c.Assert(region.GetBytesRead(), Equals, uint64(0))
c.Assert(newRegion.GetBytesRead(), Not(Equals), uint64(0))
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
Expand Down Expand Up @@ -914,7 +907,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
c.Assert(cache.SearchRegion(regionKey), IsNil)
checkRegions(c, cache, regions[0:i])

cache.AddRegion(region)
cache.SetRegion(region)
checkRegion(c, cache.GetRegion(i), region)
checkRegion(c, cache.SearchRegion(regionKey), region)
checkRegions(c, cache, regions[0:(i+1)])
Expand All @@ -940,7 +933,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// Reset leader to peer 0.
newRegion = region.Clone(core.WithLeader(region.GetPeers()[0]))
regions[i] = newRegion
cache.AddRegion(newRegion)
cache.SetRegion(newRegion)
checkRegion(c, cache.GetRegion(i), newRegion)
checkRegions(c, cache, regions[0:(i+1)])
checkRegion(c, cache.SearchRegion(regionKey), newRegion)
Expand All @@ -959,7 +952,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// check overlaps
// clone it otherwise there are two items with the same key in the tree
overlapRegion := regions[n-1].Clone(core.WithStartKey(regions[n-2].GetStartKey()))
cache.AddRegion(overlapRegion)
cache.SetRegion(overlapRegion)
c.Assert(cache.GetRegion(n-2), IsNil)
c.Assert(cache.GetRegion(n-1), NotNil)

Expand Down
98 changes: 44 additions & 54 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ func (c *coordinator) stop() {
// Hack to retrieve info from scheduler.
// TODO: remove it.
type hasHotStatus interface {
GetHotReadStatus() *statistics.StoreHotPeersInfos
GetHotWriteStatus() *statistics.StoreHotPeersInfos
GetHotStatus(typ string) *statistics.StoreHotPeersInfos
GetPendingInfluence() map[uint64]*schedulers.Influence
}

Expand All @@ -437,7 +436,7 @@ func (c *coordinator) getHotWriteRegions() *statistics.StoreHotPeersInfos {
return nil
}
if h, ok := s.Scheduler.(hasHotStatus); ok {
return h.GetHotWriteStatus()
return h.GetHotStatus(schedulers.HotWriteRegionType)
}
return nil
}
Expand All @@ -450,7 +449,7 @@ func (c *coordinator) getHotReadRegions() *statistics.StoreHotPeersInfos {
return nil
}
if h, ok := s.Scheduler.(hasHotStatus); ok {
return h.GetHotReadStatus()
return h.GetHotStatus(schedulers.HotReadRegionType)
}
return nil
}
Expand Down Expand Up @@ -503,74 +502,65 @@ func (c *coordinator) collectHotSpotMetrics() {
}
c.RUnlock()
stores := c.cluster.GetStores()
status := s.Scheduler.(hasHotStatus).GetHotWriteStatus()
pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence()
for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionWriteBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalLoads[statistics.RegionWriteKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0)
}
// Collects hot write region metrics.
collectHotMetrics(s, stores, schedulers.HotWriteRegionType)
// Collects hot read region metrics.
collectHotMetrics(s, stores, schedulers.HotReadRegionType)
// Collects pending influence.
collectPendingInfluence(s, stores)
}

stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionWriteBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalLoads[statistics.RegionWriteKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}
func collectHotMetrics(s *scheduleController, stores []*core.StoreInfo, typ string) {
status := s.Scheduler.(hasHotStatus).GetHotStatus(typ)
var (
kind string
byteTyp, keyTyp statistics.RegionStatKind
)

// TODO: add to tidb-ansible after merging pending influence into operator influence.
if infl := pendings[storeID]; infl != nil {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
}
switch typ {
case schedulers.HotReadRegionType:
kind, byteTyp, keyTyp = "read", statistics.RegionReadBytes, statistics.RegionReadKeys
case schedulers.HotWriteRegionType:
kind, byteTyp, keyTyp = "write", statistics.RegionWriteBytes, statistics.RegionWriteKeys
}

// Collects hot read region metrics.
status = s.Scheduler.(hasHotStatus).GetHotReadStatus()
for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalLoads[statistics.RegionReadBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalLoads[statistics.RegionReadKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_leader").Set(stat.TotalLoads[byteTyp])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_leader").Set(stat.TotalLoads[keyTyp])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_leader").Set(0)
}

stat, ok = status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_peer").Set(stat.TotalLoads[statistics.RegionReadBytes])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_peer").Set(stat.TotalLoads[statistics.RegionReadKeys])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_peer").Set(float64(stat.Count))
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_peer").Set(stat.TotalLoads[byteTyp])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_peer").Set(stat.TotalLoads[keyTyp])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_"+kind+"_keys_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_"+kind+"_region_as_peer").Set(0)
}
}
}

func collectPendingInfluence(s *scheduleController, stores []*core.StoreInfo) {
pendings := s.Scheduler.(hasHotStatus).GetPendingInfluence()
for _, s := range stores {
storeAddress := s.GetAddress()
storeID := s.GetID()
storeLabel := fmt.Sprintf("%d", storeID)
if infl := pendings[storeID]; infl != nil {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_byte_rate").Set(infl.Loads[statistics.ByteDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_key_rate").Set(infl.Loads[statistics.KeyDim])
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "pending_influence_count").Set(infl.Count)
}
}
}
Expand Down
33 changes: 30 additions & 3 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"flag"
"fmt"
"math"
"net/url"
"os"
"path/filepath"
Expand Down Expand Up @@ -1076,9 +1077,9 @@ type PDServerConfig struct {
MetricStorage string `toml:"metric-storage" json:"metric-storage"`
// There are some values supported: "auto", "none", or a specific address, default: "auto"
DashboardAddress string `toml:"dashboard-address" json:"dashboard-address"`
// TraceRegionFlow the option to update flow information of regions
// TODO: deprecate
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string"`
// TraceRegionFlow the option to update flow information of regions.
// WARN: TraceRegionFlow is deprecated.
TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"`
// FlowRoundByDigit used to discretization processing flow information.
FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"`
}
Expand All @@ -1103,9 +1104,35 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
if !meta.IsDefined("flow-round-by-digit") {
adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit)
}
c.migrateConfigurationFromFile(meta)
return c.Validate()
}

func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error {
oldName, newName := "trace-region-flow", "flow-round-by-digit"
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
case defineOld && defineNew:
if c.TraceRegionFlow && (c.FlowRoundByDigit == defaultFlowRoundByDigit) {
return errors.Errorf("config item %s and %s(deprecated) are conflict", newName, oldName)
}
case defineOld && !defineNew:
if !c.TraceRegionFlow {
c.FlowRoundByDigit = math.MaxInt8
}
}
return nil
}

// MigrateDeprecatedFlags updates new flags according to deprecated flags.
func (c *PDServerConfig) MigrateDeprecatedFlags() {
if !c.TraceRegionFlow {
c.FlowRoundByDigit = math.MaxInt8
}
// json omity the false. next time will not persist to the kv.
c.TraceRegionFlow = false
}

// Clone returns a cloned PD server config.
func (c *PDServerConfig) Clone() *PDServerConfig {
runtimeServices := append(c.RuntimeServices[:0:0], c.RuntimeServices...)
Expand Down
4 changes: 4 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"encoding/json"
"fmt"
"math"
"os"
"path"
"strings"
Expand Down Expand Up @@ -284,13 +285,16 @@ func (s *testConfigSuite) TestMigrateFlags(c *C) {
return cfg, err
}
cfg, err := load(`
[pd-server]
trace-region-flow = false
[schedule]
disable-remove-down-replica = true
enable-make-up-replica = false
disable-remove-extra-replica = true
enable-remove-extra-replica = false
`)
c.Assert(err, IsNil)
c.Assert(cfg.PDServerCfg.FlowRoundByDigit, Equals, math.MaxInt8)
c.Assert(cfg.Schedule.EnableReplaceOfflineReplica, IsTrue)
c.Assert(cfg.Schedule.EnableRemoveDownReplica, IsFalse)
c.Assert(cfg.Schedule.EnableMakeUpReplica, IsFalse)
Expand Down
Loading

0 comments on commit 4778740

Please sign in to comment.