From a706d8fb083848b96e50ddee725c6bfa55c85558 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 27 Dec 2021 16:21:50 +0800 Subject: [PATCH 1/6] api, pd-ctl: fix the region range hole end key check and add some help info (#4498) * Fix the region range hole end key check and add some help info (close #4496 #4216) Signed-off-by: JmPotato * Fix the typo Signed-off-by: JmPotato --- server/api/region_test.go | 1 + server/core/region.go | 4 +++ tests/pdctl/region/region_test.go | 1 + tools/pd-ctl/pdctl/command/region_command.go | 27 +++++++++++++++++--- 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/server/api/region_test.go b/server/api/region_test.go index b57553bc49c2..d903afa86015 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -536,6 +536,7 @@ func (s *testGetRegionRangeHolesSuite) TestRegionRangeHoles(c *C) { {"", core.HexRegionKeyStr(r1.GetStartKey())}, {core.HexRegionKeyStr(r1.GetEndKey()), core.HexRegionKeyStr(r3.GetStartKey())}, {core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r6.GetStartKey())}, + {core.HexRegionKeyStr(r6.GetEndKey()), ""}, }) } diff --git a/server/core/region.go b/server/core/region.go index aa403dc76677..381aa50c6b8e 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -1143,6 +1143,10 @@ func (r *RegionsInfo) GetRangeHoles() [][]string { lastEndKey = region.GetEndKey() return true }) + // If the last end key is not empty, it means there is a range hole at the end. + if len(lastEndKey) > 0 { + rangeHoles = append(rangeHoles, []string{HexRegionKeyStr(lastEndKey), ""}) + } return rangeHoles } diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index 6423b97e81e3..507094cadceb 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -201,5 +201,6 @@ func (s *regionTestSuite) TestRegion(c *C) { c.Assert(*rangeHoles, DeepEquals, [][]string{ {"", core.HexRegionKeyStr(r1.GetStartKey())}, {core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r5.GetStartKey())}, + {core.HexRegionKeyStr(r5.GetEndKey()), ""}, }) } diff --git a/tools/pd-ctl/pdctl/command/region_command.go b/tools/pd-ctl/pdctl/command/region_command.go index c16dc983df9e..600a6ebaa88d 100644 --- a/tools/pd-ctl/pdctl/command/region_command.go +++ b/tools/pd-ctl/pdctl/command/region_command.go @@ -519,12 +519,33 @@ func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) { cmd.Println(r) } +const ( + rangeHolesLongDesc = `There are some cases that the region range is not continuous, for example, the region doesn't send the heartbeat to PD after a splitting. +This command will output all empty ranges without any region info.` + rangeHolesExample = ` + If PD now holds the region ranges info like ["", "a"], ["b", "x"], ["x", "z"]. The the output will be like: + + [ + [ + "a", + "b" + ], + [ + "z", + "" + ], + ] +` +) + // NewRangesWithRangeHolesCommand returns ranges with range-holes subcommand of regionCmd func NewRangesWithRangeHolesCommand() *cobra.Command { r := &cobra.Command{ - Use: "range-holes", - Short: "show all empty ranges without any region info", - Run: showRangesWithRangeHolesCommandFunc, + Use: "range-holes", + Short: "show all empty ranges without any region info.", + Long: rangeHolesLongDesc, + Example: rangeHolesExample, + Run: showRangesWithRangeHolesCommandFunc, } return r } From ea57c6d6a0c9ad081aded76545f3d97bc465635f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 27 Dec 2021 20:41:49 +0800 Subject: [PATCH 2/6] *: move `parseUrls` to util (#4506) * move parse url to util close #4505 Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung --- pkg/apiutil/serverapi/middleware.go | 15 +++++++++------ server/config/config.go | 25 ++++--------------------- server/config/util.go | 18 ++++++++++++++++++ 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/pkg/apiutil/serverapi/middleware.go b/pkg/apiutil/serverapi/middleware.go index 973775d1214b..1fea0e1b9d07 100644 --- a/pkg/apiutil/serverapi/middleware.go +++ b/pkg/apiutil/serverapi/middleware.go @@ -18,12 +18,10 @@ import ( "io" "net/http" "net/url" - "strings" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/config" "github.com/urfave/negroni" "go.uber.org/zap" ) @@ -113,11 +111,16 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http http.Error(w, "no leader", http.StatusServiceUnavailable) return } + clientUrls := leader.GetClientUrls() + urls := make([]url.URL, 0, len(clientUrls)) + for _, item := range clientUrls { + u, err := url.Parse(item) + if err != nil { + http.Error(w, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error(), http.StatusInternalServerError) + return + } - urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ",")) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + urls = append(urls, *u) } client := h.s.GetHTTPClient() NewCustomReverseProxies(client, urls).ServeHTTP(w, r) diff --git a/server/config/config.go b/server/config/config.go index b02ab0e74b3c..930d459b0f34 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -1199,23 +1199,6 @@ func (c LabelPropertyConfig) Clone() LabelPropertyConfig { return m } -// ParseUrls parse a string into multiple urls. -// Export for api. -func ParseUrls(s string) ([]url.URL, error) { - items := strings.Split(s, ",") - urls := make([]url.URL, 0, len(items)) - for _, item := range items { - u, err := url.Parse(item) - if err != nil { - return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() - } - - urls = append(urls, *u) - } - - return urls, nil -} - // SetupLogger setup the logger. func (c *Config) SetupLogger() error { lg, p, err := log.InitLogger(&c.Log, zap.AddStacktrace(zapcore.FatalLevel)) @@ -1283,22 +1266,22 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) { cfg.Logger = "zap" var err error - cfg.LPUrls, err = ParseUrls(c.PeerUrls) + cfg.LPUrls, err = parseUrls(c.PeerUrls) if err != nil { return nil, err } - cfg.APUrls, err = ParseUrls(c.AdvertisePeerUrls) + cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls) if err != nil { return nil, err } - cfg.LCUrls, err = ParseUrls(c.ClientUrls) + cfg.LCUrls, err = parseUrls(c.ClientUrls) if err != nil { return nil, err } - cfg.ACUrls, err = ParseUrls(c.AdvertiseClientUrls) + cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls) if err != nil { return nil, err } diff --git a/server/config/util.go b/server/config/util.go index 7683f0dde0f2..af11a7d8fbe2 100644 --- a/server/config/util.go +++ b/server/config/util.go @@ -17,9 +17,11 @@ package config import ( "net/url" "regexp" + "strings" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/errs" ) const ( @@ -87,3 +89,19 @@ func NewTestOptions() *PersistOptions { c.Adjust(nil, false) return NewPersistOptions(c) } + +// parseUrls parse a string into multiple urls. +func parseUrls(s string) ([]url.URL, error) { + items := strings.Split(s, ",") + urls := make([]url.URL, 0, len(items)) + for _, item := range items { + u, err := url.Parse(item) + if err != nil { + return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause() + } + + urls = append(urls, *u) + } + + return urls, nil +} From 909f2fdebb3f06b704dfc4498b498708efe9ee32 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 28 Dec 2021 13:41:49 +0800 Subject: [PATCH 3/6] statistics: small refactor of hot statistics (#4461) * small refactor of hot statistics close #4463 Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung * address the comment Signed-off-by: Ryan Leung Co-authored-by: Ti Chi Robot --- pkg/mock/mockcluster/mockcluster.go | 10 +- server/cluster/cluster.go | 6 +- server/handler.go | 2 +- server/schedulers/hot_region_test.go | 6 +- server/statistics/hot_cache.go | 102 +++++++++------------ server/statistics/hot_cache_task.go | 54 +++++------ server/statistics/hot_peer.go | 60 ++++++------ server/statistics/hot_peer_cache.go | 111 ++++++++++++----------- server/statistics/hot_peer_cache_test.go | 70 +++++++------- server/statistics/kind.go | 22 +++++ 10 files changed, 225 insertions(+), 218 deletions(-) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index ce8c79652ab4..0864e9e525c3 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { // HotRegionsFromStore picks hot regions in specify store. func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo { - stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold()) + stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold()) regions := make([]*core.RegionInfo, 0, len(stats)) for _, stat := range stats { region := mc.GetRegion(stat.RegionID) @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []* return regions } +// hotRegionsFromStore picks hot region in specify store. +func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat { + if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { + return stats + } + return nil +} + // AllocPeer allocs a new peer on a store. func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := mc.AllocID() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index fa9fce926efb..c6738b916fb6 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { c.limiter.Collect(newStore.GetStoreStats()) } - regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats())) + regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats())) for _, peerStat := range stats.GetPeerStats() { regionID := peerStat.GetRegionId() - regionIDs[regionID] = struct{}{} region := c.GetRegion(regionID) + regions[regionID] = region if region == nil { log.Warn("discard hot peer stat for unknown region", zap.Uint64("region-id", regionID), @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { peerInfo := core.NewPeerInfo(peer, loads, interval) c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } - c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval)) + c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil } diff --git a/server/handler.go b/server/handler.go index df3285120b63..46fc5c984553 100644 --- a/server/handler.go +++ b/server/handler.go @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR } } stat := core.HistoryHotRegion{ - // store in ms. + // store in ms. UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond), RegionID: hotPeerStat.RegionID, StoreID: hotPeerStat.StoreID, diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index dd7192cba55e..39328ceac744 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { c.Check(len(items), Greater, 0) for _, item := range items { if item.StoreID == 3 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) continue } c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2) @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) { items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1) for _, item := range items { if item.StoreID < 4 { - c.Check(item.IsNeedDelete(), IsTrue) + c.Check(item.GetActionType(), Equals, statistics.Remove) } else { - c.Check(item.IsNeedDelete(), IsFalse) + c.Check(item.GetActionType(), Equals, statistics.Update) } } } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 125b5f6206e8..ece8932c959c 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -28,43 +28,39 @@ const queueCap = 20000 // HotCache is a cache hold hot regions. type HotCache struct { - ctx context.Context - readFlowQueue chan FlowItemTask - writeFlowQueue chan FlowItemTask - writeFlow *hotPeerCache - readFlow *hotPeerCache + ctx context.Context + writeCache *hotPeerCache + readCache *hotPeerCache } // NewHotCache creates a new hot spot cache. func NewHotCache(ctx context.Context) *HotCache { w := &HotCache{ - ctx: ctx, - readFlowQueue: make(chan FlowItemTask, queueCap), - writeFlowQueue: make(chan FlowItemTask, queueCap), - writeFlow: NewHotPeerCache(Write), - readFlow: NewHotPeerCache(Read), + ctx: ctx, + writeCache: NewHotPeerCache(Write), + readCache: NewHotPeerCache(Read), } - go w.updateItems(w.readFlowQueue, w.runReadTask) - go w.updateItems(w.writeFlowQueue, w.runWriteTask) + go w.updateItems(w.readCache.taskQueue, w.runReadTask) + go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) return w } // CheckWritePeerSync checks the write status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckWritePeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.writeFlow.CheckPeerFlow(peer, region) + return w.writeCache.checkPeerFlow(peer, region) } // CheckReadPeerSync checks the read status, returns update items. // This is used for mockcluster. func (w *HotCache) CheckReadPeerSync(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { - return w.readFlow.CheckPeerFlow(peer, region) + return w.readCache.checkPeerFlow(peer, region) } // CheckWriteAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { +func (w *HotCache) CheckWriteAsync(task flowItemTask) bool { select { - case w.writeFlowQueue <- task: + case w.writeCache.taskQueue <- task: return true default: return false @@ -72,9 +68,9 @@ func (w *HotCache) CheckWriteAsync(task FlowItemTask) bool { } // CheckReadAsync puts the flowItem into queue, and check it asynchronously -func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { +func (w *HotCache) CheckReadAsync(task flowItemTask) bool { select { - case w.readFlowQueue <- task: + case w.readCache.taskQueue <- task: return true default: return false @@ -86,39 +82,26 @@ func (w *HotCache) CheckReadAsync(task FlowItemTask) bool { func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { case Write: - update(item, w.writeFlow) + updateStat(w.writeCache, item) case Read: - update(item, w.readFlow) + updateStat(w.readCache, item) } } // RegionStats returns hot items according to kind func (w *HotCache) RegionStats(kind RWType, minHotDegree int) map[uint64][]*HotPeerStat { + task := newCollectRegionStatsTask(minHotDegree) + var succ bool switch kind { case Write: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckWriteAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckWriteAsync(task) case Read: - task := newCollectRegionStatsTask(minHotDegree) - succ := w.CheckReadAsync(task) - if !succ { - return nil - } - return task.waitRet(w.ctx) + succ = w.CheckReadAsync(task) } - return nil -} - -// HotRegionsFromStore picks hot region in specify store. -func (w *HotCache) HotRegionsFromStore(storeID uint64, kind RWType, minHotDegree int) []*HotPeerStat { - if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 { - return stats + if !succ { + return nil } - return nil + return task.waitRet(w.ctx) } // IsRegionHot checks if the region is hot. @@ -149,13 +132,13 @@ func (w *HotCache) ResetMetrics() { // ExpiredReadItems returns the read items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredReadItems(region *core.RegionInfo) []*HotPeerStat { - return w.readFlow.CollectExpiredItems(region) + return w.readCache.collectExpiredItems(region) } // ExpiredWriteItems returns the write items which are already expired. // This is used for mockcluster. func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { - return w.writeFlow.CollectExpiredItems(region) + return w.writeCache.collectExpiredItems(region) } func incMetrics(name string, storeID uint64, kind RWType) { @@ -172,14 +155,14 @@ func incMetrics(name string, storeID uint64, kind RWType) { func (w *HotCache) GetFilledPeriod(kind RWType) int { switch kind { case Write: - return w.writeFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.writeCache.getDefaultTimeMedian().GetFilledPeriod() case Read: - return w.readFlow.getDefaultTimeMedian().GetFilledPeriod() + return w.readCache.getDefaultTimeMedian().GetFilledPeriod() } return 0 } -func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task FlowItemTask)) { +func (w *HotCache) updateItems(queue <-chan flowItemTask, runTask func(task flowItemTask)) { for { select { case <-w.ctx.Done(): @@ -190,29 +173,30 @@ func (w *HotCache) updateItems(queue <-chan FlowItemTask, runTask func(task Flow } } -func (w *HotCache) runReadTask(task FlowItemTask) { +func (w *HotCache) runReadTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.readFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.readCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Read.String()).Set(float64(len(w.readCache.taskQueue))) } } -func (w *HotCache) runWriteTask(task FlowItemTask) { +func (w *HotCache) runWriteTask(task flowItemTask) { if task != nil { - // TODO: do we need a run-task timeout to protect the queue won't be stucked by a task? - task.runTask(w.writeFlow) - hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeFlowQueue))) + // TODO: do we need a run-task timeout to protect the queue won't be stuck by a task? + task.runTask(w.writeCache) + hotCacheFlowQueueStatusGauge.WithLabelValues(Write.String()).Set(float64(len(w.writeCache.taskQueue))) } } -func update(item *HotPeerStat, flow *hotPeerCache) { - flow.Update(item) - if item.IsNeedDelete() { - incMetrics("remove_item", item.StoreID, item.Kind) - } else if item.IsNew() { +func updateStat(cache *hotPeerCache, item *HotPeerStat) { + cache.update(item) + switch item.actionType { + case Add: incMetrics("add_item", item.StoreID, item.Kind) - } else { + case Remove: + incMetrics("remove_item", item.StoreID, item.Kind) + case Update: incMetrics("update_item", item.StoreID, item.Kind) } } diff --git a/server/statistics/hot_cache_task.go b/server/statistics/hot_cache_task.go index 672306b35d06..2f71c5ecee56 100644 --- a/server/statistics/hot_cache_task.go +++ b/server/statistics/hot_cache_task.go @@ -31,10 +31,10 @@ const ( collectMetricsTaskType ) -// FlowItemTask indicates the task in flowItem queue -type FlowItemTask interface { +// flowItemTask indicates the task in flowItem queue +type flowItemTask interface { taskType() flowItemTaskKind - runTask(flow *hotPeerCache) + runTask(cache *hotPeerCache) } type checkPeerTask struct { @@ -43,7 +43,7 @@ type checkPeerTask struct { } // NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) FlowItemTask { +func NewCheckPeerTask(peerInfo *core.PeerInfo, regionInfo *core.RegionInfo) flowItemTask { return &checkPeerTask{ peerInfo: peerInfo, regionInfo: regionInfo, @@ -54,10 +54,10 @@ func (t *checkPeerTask) taskType() flowItemTaskKind { return checkPeerTaskType } -func (t *checkPeerTask) runTask(flow *hotPeerCache) { - stat := flow.CheckPeerFlow(t.peerInfo, t.regionInfo) +func (t *checkPeerTask) runTask(cache *hotPeerCache) { + stat := cache.checkPeerFlow(t.peerInfo, t.regionInfo) if stat != nil { - update(stat, flow) + updateStat(cache, stat) } } @@ -66,7 +66,7 @@ type checkExpiredTask struct { } // NewCheckExpiredItemTask creates task to collect expired items -func NewCheckExpiredItemTask(region *core.RegionInfo) FlowItemTask { +func NewCheckExpiredItemTask(region *core.RegionInfo) flowItemTask { return &checkExpiredTask{ region: region, } @@ -76,25 +76,25 @@ func (t *checkExpiredTask) taskType() flowItemTaskKind { return checkExpiredTaskType } -func (t *checkExpiredTask) runTask(flow *hotPeerCache) { - expiredStats := flow.CollectExpiredItems(t.region) +func (t *checkExpiredTask) runTask(cache *hotPeerCache) { + expiredStats := cache.collectExpiredItems(t.region) for _, stat := range expiredStats { - update(stat, flow) + updateStat(cache, stat) } } type collectUnReportedPeerTask struct { - storeID uint64 - regionIDs map[uint64]struct{} - interval uint64 + storeID uint64 + regions map[uint64]*core.RegionInfo + interval uint64 } // NewCollectUnReportedPeerTask creates task to collect unreported peers -func NewCollectUnReportedPeerTask(storeID uint64, regionIDs map[uint64]struct{}, interval uint64) FlowItemTask { +func NewCollectUnReportedPeerTask(storeID uint64, regions map[uint64]*core.RegionInfo, interval uint64) flowItemTask { return &collectUnReportedPeerTask{ - storeID: storeID, - regionIDs: regionIDs, - interval: interval, + storeID: storeID, + regions: regions, + interval: interval, } } @@ -102,10 +102,10 @@ func (t *collectUnReportedPeerTask) taskType() flowItemTaskKind { return collectUnReportedPeerTaskType } -func (t *collectUnReportedPeerTask) runTask(flow *hotPeerCache) { - stats := flow.CheckColdPeer(t.storeID, t.regionIDs, t.interval) +func (t *collectUnReportedPeerTask) runTask(cache *hotPeerCache) { + stats := cache.checkColdPeer(t.storeID, t.regions, t.interval) for _, stat := range stats { - update(stat, flow) + updateStat(cache, stat) } } @@ -125,8 +125,8 @@ func (t *collectRegionStatsTask) taskType() flowItemTaskKind { return collectRegionStatsTaskType } -func (t *collectRegionStatsTask) runTask(flow *hotPeerCache) { - t.ret <- flow.RegionStats(t.minDegree) +func (t *collectRegionStatsTask) runTask(cache *hotPeerCache) { + t.ret <- cache.RegionStats(t.minDegree) } // TODO: do we need a wait-return timeout? @@ -157,8 +157,8 @@ func (t *isRegionHotTask) taskType() flowItemTaskKind { return isRegionHotTaskType } -func (t *isRegionHotTask) runTask(flow *hotPeerCache) { - t.ret <- flow.isRegionHotWithAnyPeers(t.region, t.minHotDegree) +func (t *isRegionHotTask) runTask(cache *hotPeerCache) { + t.ret <- cache.isRegionHotWithAnyPeers(t.region, t.minHotDegree) } // TODO: do we need a wait-return timeout? @@ -185,6 +185,6 @@ func (t *collectMetricsTask) taskType() flowItemTaskKind { return collectMetricsTaskType } -func (t *collectMetricsTask) runTask(flow *hotPeerCache) { - flow.CollectMetrics(t.typ) +func (t *collectMetricsTask) runTask(cache *hotPeerCache) { + cache.collectMetrics(t.typ) } diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index e7055a38af30..0bebaaa7a5f3 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -33,48 +33,48 @@ const ( type dimStat struct { typ RegionStatKind - Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. - LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. + rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. + lastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. } func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat { return &dimStat{ typ: typ, - Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), - LastAverage: movingaverage.NewAvgOverTime(reportInterval), + rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), + lastAverage: movingaverage.NewAvgOverTime(reportInterval), } } func (d *dimStat) Add(delta float64, interval time.Duration) { - d.LastAverage.Add(delta, interval) - d.Rolling.Add(delta, interval) + d.lastAverage.Add(delta, interval) + d.rolling.Add(delta, interval) } func (d *dimStat) isLastAverageHot(threshold float64) bool { - return d.LastAverage.Get() >= threshold + return d.lastAverage.Get() >= threshold } func (d *dimStat) isHot(threshold float64) bool { - return d.Rolling.Get() >= threshold + return d.rolling.Get() >= threshold } func (d *dimStat) isFull() bool { - return d.LastAverage.IsFull() + return d.lastAverage.IsFull() } func (d *dimStat) clearLastAverage() { - d.LastAverage.Clear() + d.lastAverage.Clear() } func (d *dimStat) Get() float64 { - return d.Rolling.Get() + return d.rolling.Get() } func (d *dimStat) Clone() *dimStat { return &dimStat{ typ: d.typ, - Rolling: d.Rolling.Clone(), - LastAverage: d.LastAverage.Clone(), + rolling: d.rolling.Clone(), + lastAverage: d.lastAverage.Clone(), } } @@ -97,11 +97,8 @@ type HotPeerStat struct { // LastUpdateTime used to calculate average write LastUpdateTime time.Time `json:"last_update_time"` - needDelete bool - isLeader bool - isNew bool - // TODO: remove it when we send peer stat by store info - justTransferLeader bool + actionType ActionType + isLeader bool interval uint64 thresholds []float64 peers []uint64 @@ -140,10 +137,9 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Int("hot-degree", stat.HotDegree), zap.Int("hot-anti-count", stat.AntiCount), zap.Duration("sum-interval", stat.getIntervalSum()), - zap.Bool("need-delete", stat.IsNeedDelete()), zap.String("source", stat.source.String()), zap.Bool("allow-adopt", stat.allowAdopt), - zap.Bool("just-transfer-leader", stat.justTransferLeader), + zap.String("action-type", stat.actionType.String()), zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime)) } @@ -152,22 +148,17 @@ func (stat *HotPeerStat) IsNeedCoolDownTransferLeader(minHotDegree int) bool { return time.Since(stat.lastTransferLeaderTime).Seconds() < float64(minHotDegree*stat.hotStatReportInterval()) } -// IsNeedDelete to delete the item in cache. -func (stat *HotPeerStat) IsNeedDelete() bool { - return stat.needDelete -} - // IsLeader indicates the item belong to the leader. func (stat *HotPeerStat) IsLeader() bool { return stat.isLeader } -// IsNew indicates the item is first update in the cache of the region. -func (stat *HotPeerStat) IsNew() bool { - return stat.isNew +// GetActionType returns the item action type. +func (stat *HotPeerStat) GetActionType() ActionType { + return stat.actionType } -// GetLoad returns denoised load if possible. +// GetLoad returns denoising load if possible. func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { if len(stat.rollingLoads) > int(k) { return math.Round(stat.rollingLoads[int(k)].Get()) @@ -175,7 +166,7 @@ func (stat *HotPeerStat) GetLoad(k RegionStatKind) float64 { return math.Round(stat.Loads[int(k)]) } -// GetLoads returns denoised load if possible. +// GetLoads returns denoising load if possible. func (stat *HotPeerStat) GetLoads() []float64 { regionStats := stat.Kind.RegionStats() loads := make([]float64, len(regionStats)) @@ -185,7 +176,8 @@ func (stat *HotPeerStat) GetLoads() []float64 { return loads } -// GetThresholds returns thresholds +// GetThresholds returns thresholds. +// Only for test purpose. func (stat *HotPeerStat) GetThresholds() []float64 { return stat.thresholds } @@ -201,9 +193,9 @@ func (stat *HotPeerStat) Clone() *HotPeerStat { return &ret } -func (stat *HotPeerStat) isFullAndHot() bool { +func (stat *HotPeerStat) isHot() bool { return slice.AnyOf(stat.rollingLoads, func(i int) bool { - return stat.rollingLoads[i].isFull() && stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) + return stat.rollingLoads[i].isLastAverageHot(stat.thresholds[i]) }) } @@ -224,5 +216,5 @@ func (stat *HotPeerStat) getIntervalSum() time.Duration { if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil { return 0 } - return stat.rollingLoads[0].LastAverage.GetIntervalSum() + return stat.rollingLoads[0].lastAverage.GetIntervalSum() } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index c5a08db5dd8d..7de9a8ceaca3 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -61,6 +61,7 @@ type hotPeerCache struct { inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat topNTTL time.Duration reportIntervalSecs int + taskQueue chan flowItemTask } // NewHotPeerCache creates a hotPeerCache @@ -71,6 +72,7 @@ func NewHotPeerCache(kind RWType) *hotPeerCache { storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), inheritItem: make(map[uint64]*HotPeerStat), + taskQueue: make(chan flowItemTask, queueCap), } if kind == Write { c.reportIntervalSecs = WriteReportInterval @@ -98,14 +100,14 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { return res } -// Update updates the items in statistics. -func (f *hotPeerCache) Update(item *HotPeerStat) { - if item.IsNeedDelete() { +// update updates the items in statistics. +func (f *hotPeerCache) update(item *HotPeerStat) { + if item.actionType == Remove { if item.AntiCount > 0 { // means it's deleted because expired rather than cold f.putInheritItem(item) } f.removeItem(item) - item.Log("region heartbeat delete from cache", log.Debug) + item.Log("region heartbeat remove from cache", log.Debug) } else { f.putItem(item) item.Log("region heartbeat update", log.Debug) @@ -136,15 +138,15 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) { } } -// CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items -func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat { +// collectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items +func (f *hotPeerCache) collectExpiredItems(region *core.RegionInfo) []*HotPeerStat { regionID := region.GetID() items := make([]*HotPeerStat, 0) for _, storeID := range f.getAllStoreIDs(region) { if region.GetStorePeer(storeID) == nil { item := f.getOldHotPeerStat(regionID, storeID) if item != nil { - item.needDelete = true + item.actionType = Remove items = append(items, item) } } @@ -152,10 +154,10 @@ func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerSt return items } -// CheckPeerFlow checks the flow information of a peer. -// Notice: CheckPeerFlow couldn't be used concurrently. -// CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. -func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { +// checkPeerFlow checks the flow information of a peer. +// Notice: checkPeerFlow couldn't be used concurrently. +// checkPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. +func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { interval := peer.GetInterval() if Denoising && interval < HotRegionReportMinInterval { return nil @@ -167,7 +169,6 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf for i := range deltaLoads { loads[i] = deltaLoads[i] / float64(interval) } - justTransferLeader := f.justTransferLeader(region) oldItem := f.getOldHotPeerStat(region.GetID(), storeID) thresholds := f.calcHotThresholds(storeID) regionPeers := region.GetPeers() @@ -176,18 +177,17 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf peers = append(peers, peer.StoreId) } newItem := &HotPeerStat{ - StoreID: storeID, - RegionID: region.GetID(), - Kind: f.kind, - Loads: loads, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: region.GetLeader().GetStoreId() == storeID, - justTransferLeader: justTransferLeader, - interval: interval, - peers: peers, - thresholds: thresholds, - source: direct, + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + Loads: loads, + LastUpdateTime: time.Now(), + isLeader: region.GetLeader().GetStoreId() == storeID, + interval: interval, + peers: peers, + actionType: Update, + thresholds: thresholds, + source: direct, } if oldItem == nil { @@ -205,11 +205,11 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf } } } - return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + return f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) } -// CheckColdPeer checks the collect the un-heartbeat peer and maintain it. -func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]struct{}, interval uint64) (ret []*HotPeerStat) { +// checkColdPeer checks the collect the un-heartbeat peer and maintain it. +func (f *hotPeerCache) checkColdPeer(storeID uint64, reportRegions map[uint64]*core.RegionInfo, interval uint64) (ret []*HotPeerStat) { if Denoising && interval < HotRegionReportMinInterval { return } @@ -218,7 +218,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } for regionID := range previousHotStat { - if _, ok := reportRegions[regionID]; !ok { + if region, ok := reportRegions[regionID]; !ok { oldItem := f.getOldHotPeerStat(regionID, storeID) if oldItem == nil { continue @@ -228,21 +228,20 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st RegionID: regionID, Kind: f.kind, // use oldItem.thresholds to make the newItem won't affect the threshold - Loads: oldItem.thresholds, - LastUpdateTime: time.Now(), - needDelete: false, - isLeader: oldItem.isLeader, - justTransferLeader: oldItem.justTransferLeader, - interval: interval, - peers: oldItem.peers, - thresholds: oldItem.thresholds, - inCold: true, + Loads: oldItem.thresholds, + LastUpdateTime: time.Now(), + isLeader: oldItem.isLeader, + interval: interval, + peers: oldItem.peers, + actionType: Update, + thresholds: oldItem.thresholds, + inCold: true, } deltaLoads := make([]float64, RegionStatCount) for i, loads := range oldItem.thresholds { deltaLoads[i] = loads * float64(interval) } - stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) + stat := f.updateHotPeerStat(region, newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if stat != nil { ret = append(ret, stat) } @@ -251,7 +250,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st return } -func (f *hotPeerCache) CollectMetrics(typ string) { +func (f *hotPeerCache) collectMetrics(typ string) { for storeID, peers := range f.peersOfStore { store := storeTag(storeID) thresholds := f.calcHotThresholds(storeID) @@ -338,6 +337,9 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool } func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool { + if region == nil { + return false + } ids, ok := f.storesOfRegion[region.GetID()] if ok { for storeID := range ids { @@ -379,10 +381,10 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() if oldItem == nil { - return f.updateNewHotPeerStat(newItem, deltaLoads, interval) + return f.updateNewHotPeerStat(regionStats, newItem, deltaLoads, interval) } if newItem.source == adopt { @@ -395,14 +397,14 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa newItem.allowAdopt = oldItem.allowAdopt } - if newItem.justTransferLeader { + if f.justTransferLeader(region) { newItem.lastTransferLeaderTime = time.Now() // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. // For write stat, as the stat is send by region heartbeat, the first heartbeat will be skipped. // For read stat, as the stat is send by store heartbeat, the first heartbeat won't be skipped. if newItem.Kind == Write { - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) return newItem } } else { @@ -416,7 +418,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa isFull := newItem.rollingLoads[0].isFull() // The intervals of dims are the same, so it is only necessary to determine whether any of them if !isFull { // not update hot degree and anti count - inheritItemDegree(newItem, oldItem) + inheritItem(newItem, oldItem) } else { // If item is inCold, it means the pd didn't recv this item in the store heartbeat, // thus we make it colder @@ -424,13 +426,13 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa coldItem(newItem, oldItem) } else { if f.isOldColdPeer(oldItem, newItem.StoreID) { - if newItem.isFullAndHot() { - initItemDegree(newItem) + if newItem.isHot() { + initItem(newItem) } else { - newItem.needDelete = true + newItem.actionType = Remove } } else { - if newItem.isFullAndHot() { + if newItem.isHot() { hotItem(newItem, oldItem) } else { coldItem(newItem, oldItem) @@ -442,8 +444,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa return newItem } -func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { - regionStats := f.kind.RegionStats() +func (f *hotPeerCache) updateNewHotPeerStat(regionStats []RegionStatKind, newItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { if interval == 0 { return nil } @@ -454,9 +455,9 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return nil } if interval.Seconds() >= float64(f.reportIntervalSecs) { - initItemDegree(newItem) + initItem(newItem) } - newItem.isNew = true + newItem.actionType = Add newItem.rollingLoads = make([]*dimStat, len(regionStats)) for i, k := range regionStats { ds := newDimStat(k, time.Duration(newItem.hotStatReportInterval())*time.Second) @@ -522,7 +523,7 @@ func coldItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 if newItem.AntiCount <= 0 { - newItem.needDelete = true + newItem.actionType = Remove } else { newItem.allowAdopt = true } @@ -537,7 +538,7 @@ func hotItem(newItem, oldItem *HotPeerStat) { } } -func initItemDegree(item *HotPeerStat) { +func initItem(item *HotPeerStat) { item.HotDegree = 1 item.AntiCount = hotRegionAntiCount item.allowAdopt = true @@ -546,7 +547,7 @@ func initItemDegree(item *HotPeerStat) { } } -func inheritItemDegree(newItem, oldItem *HotPeerStat) { +func inheritItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree newItem.AntiCount = oldItem.AntiCount } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index c3ce13f5fb5b..3ef1c03ed126 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -72,17 +72,17 @@ type testCacheCase struct { kind RWType operator operator expect int - needDelete bool + actionType ActionType } func (t *testHotPeerCache) TestCache(c *C) { tests := []*testCacheCase{ - {Read, transferLeader, 3, false}, - {Read, movePeer, 4, true}, - {Read, addReplica, 4, false}, - {Write, transferLeader, 3, true}, - {Write, movePeer, 4, true}, - {Write, addReplica, 4, true}, + {Read, transferLeader, 3, Update}, + {Read, movePeer, 4, Remove}, + {Read, addReplica, 4, Update}, + {Write, transferLeader, 3, Remove}, + {Write, movePeer, 4, Remove}, + {Write, addReplica, 4, Remove}, } for _, t := range tests { testCache(c, t) @@ -97,13 +97,13 @@ func testCache(c *C, t *testCacheCase) { cache := NewHotPeerCache(t.kind) region := buildRegion(t.kind, 3, 60) checkAndUpdate(c, cache, region, defaultSize[t.kind]) - checkHit(c, cache, region, t.kind, false) // all peers are new + checkHit(c, cache, region, t.kind, Add) // all peers are new srcStore, region := schedule(c, t.operator, region, 10) res := checkAndUpdate(c, cache, region, t.expect) - checkHit(c, cache, region, t.kind, true) // hit cache + checkHit(c, cache, region, t.kind, Update) // hit cache if t.expect != defaultSize[t.kind] { - checkNeedDelete(c, res, srcStore, t.needDelete) + checkOp(c, res, srcStore, t.actionType) } } @@ -122,10 +122,10 @@ func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.CollectExpiredItems(region)...) + res = append(res, cache.collectExpiredItems(region)...) for _, peer := range peers { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { res = append(res, item) } @@ -135,7 +135,7 @@ func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Pee func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { - cache.Update(p) + cache.update(p) } return res } @@ -168,7 +168,7 @@ func checkAndUpdateSkipOne(c *C, cache *hotPeerCache, region *core.RegionInfo, e return updateFlow(cache, res) } -func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) { +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, actionType ActionType) { var peers []*metapb.Peer if kind == Read { peers = []*metapb.Peer{region.GetLeader()} @@ -178,14 +178,14 @@ func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, i for _, peer := range peers { item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) c.Assert(item, NotNil) - c.Assert(item.isNew, Equals, !isHit) + c.Assert(item.actionType, Equals, actionType) } } -func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64, needDelete bool) { +func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) { for _, item := range ret { if item.StoreID == storeID { - c.Assert(item.needDelete, Equals, needDelete) + c.Assert(item.actionType, Equals, actionType) return } } @@ -296,55 +296,55 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval // skip interval=0 - newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0) + newItem := &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval - newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} - newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = &HotPeerStat{actionType: Update, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(nil, newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = []float64{10.0, 10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m-1) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(nil, newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) } c.Check(newItem.HotDegree, Less, 0) c.Check(newItem.AntiCount, Equals, 0) - c.Check(newItem.needDelete, IsTrue) + c.Check(newItem.actionType, Equals, Remove) } func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { @@ -369,7 +369,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold Kind: cache.kind, StoreID: storeID, RegionID: i, - needDelete: false, + actionType: Update, thresholds: thresholds, Loads: make([]float64, DimLen), } @@ -379,8 +379,8 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) - cache.Update(item) + item := cache.updateHotPeerStat(nil, newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + cache.update(item) } thresholds := cache.calcHotThresholds(storeID) if i < TopNN { @@ -493,13 +493,13 @@ func BenchmarkCheckRegionFlow(b *testing.B) { for i := 0; i < b.N; i++ { items := make([]*HotPeerStat, 0) for _, peerInfo := range peerInfos { - item := cache.CheckPeerFlow(peerInfo, region) + item := cache.checkPeerFlow(peerInfo, region) if item != nil { items = append(items, item) } } for _, ret := range items { - cache.Update(ret) + cache.update(ret) } } } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index e5965fcbfcbd..65ec300f7ac6 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -147,3 +147,25 @@ func (k RWType) RegionStats() []RegionStatKind { } return nil } + +// ActionType indicates the action type for the stat item. +type ActionType int + +// Flags for action type. +const ( + Add ActionType = iota + Remove + Update +) + +func (t ActionType) String() string { + switch t { + case Add: + return "add" + case Remove: + return "remove" + case Update: + return "update" + } + return "unimplemented" +} From 35a02c68995c0b10872cd3cc2a8f1e6fcfec4c7f Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 29 Dec 2021 12:05:50 +0800 Subject: [PATCH 4/6] api, pdctl: add HTTP Component signature (#4491) * close #4490 Signed-off-by: Cabinfever_B * Revert "close #4483: fix failpoint" This reverts commit 9fef69a1a693e8a4de0a44736db4490ef9a3d7e2. Signed-off-by: Cabinfever_B * Revert "close #4483" This reverts commit dd14b0b699d2535280a5aecbfb0e688fe4d70a83. Signed-off-by: Cabinfever_B * Revert "close #4483: add failpoint" This reverts commit 810f1f5fa17c71c660b3df3f269c8e9a2fabffab. Signed-off-by: Cabinfever_B * Revert "close #4483: add integration test" This reverts commit 974deb553c2fbbcc03aa50947e639012a3f408f4. Signed-off-by: Cabinfever_B * Revert "close #4373 : add framework" This reverts commit 01fb757473ea7b2ce342dd2bca66c2a76266b89b. Signed-off-by: Cabinfever_B * close #4490: add teset Signed-off-by: Cabinfever_B * close #4490: add teset Signed-off-by: Cabinfever_B * close #4490: fix transport and change component key Signed-off-by: Cabinfever_B * close #4490: add comment Signed-off-by: Cabinfever_B * Update pkg/apiutil/apiutil.go Co-authored-by: disksing Signed-off-by: Cabinfever_B * Update pkg/apiutil/apiutil.go Co-authored-by: disksing Signed-off-by: Cabinfever_B * close #4490: fix exported var problem Signed-off-by: Cabinfever_B * close #4490 Signed-off-by: Cabinfever_B * close #4490 Signed-off-by: Cabinfever_B Co-authored-by: disksing Co-authored-by: Ti Chi Robot --- pkg/apiutil/apiutil.go | 39 ++++++++++++++ tests/pdctl/global_test.go | 79 ++++++++++++++++++++++++++++ tests/pdctl/helper.go | 9 ++++ tools/pd-ctl/pdctl/command/global.go | 11 ++-- 4 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 tests/pdctl/global_test.go diff --git a/pkg/apiutil/apiutil.go b/pkg/apiutil/apiutil.go index 2c61ed45f288..0a904dcc7c18 100644 --- a/pkg/apiutil/apiutil.go +++ b/pkg/apiutil/apiutil.go @@ -27,6 +27,14 @@ import ( "github.com/unrolled/render" ) +var ( + // componentSignatureKey is used for http request header key + // to identify component signature + componentSignatureKey = "component" + // componentAnonymousValue identifies anonymous request source + componentAnonymousValue = "anonymous" +) + // DeferClose captures the error returned from closing (if an error occurs). // This is designed to be used in a defer statement. func DeferClose(c io.Closer, err *error) { @@ -127,3 +135,34 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) { rd.JSON(w, http.StatusInternalServerError, err.Error()) } } + +// GetComponentNameOnHTTP returns component name from Request Header +func GetComponentNameOnHTTP(r *http.Request) string { + componentName := r.Header.Get(componentSignatureKey) + if len(componentName) == 0 { + componentName = componentAnonymousValue + } + return componentName +} + +// ComponentSignatureRoundTripper is used to add component signature in HTTP header +type ComponentSignatureRoundTripper struct { + proxied http.RoundTripper + component string +} + +// NewComponentSignatureRoundTripper returns a new ComponentSignatureRoundTripper. +func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, componentName string) *ComponentSignatureRoundTripper { + return &ComponentSignatureRoundTripper{ + proxied: roundTripper, + component: componentName, + } +} + +// RoundTrip is used to implement RoundTripper +func (rt *ComponentSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { + req.Header.Add(componentSignatureKey, rt.component) + // Send the request, get the response and the error + resp, err = rt.proxied.RoundTrip(req) + return +} diff --git a/tests/pdctl/global_test.go b/tests/pdctl/global_test.go new file mode 100644 index 000000000000..bb14eeafac25 --- /dev/null +++ b/tests/pdctl/global_test.go @@ -0,0 +1,79 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pdctl + +import ( + "context" + "fmt" + "net/http" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/pkg/testutil" + "github.com/tikv/pd/server" + cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" + "go.uber.org/zap" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&globalTestSuite{}) + +type globalTestSuite struct{} + +func (s *globalTestSuite) SetUpSuite(c *C) { + server.EnableZap = true +} + +func (s *globalTestSuite) TestSendAndGetComponent(c *C) { + handler := func(ctx context.Context, s *server.Server) (http.Handler, server.ServiceGroup, error) { + mux := http.NewServeMux() + mux.HandleFunc("/pd/api/v1/health", func(w http.ResponseWriter, r *http.Request) { + component := apiutil.GetComponentNameOnHTTP(r) + for k := range r.Header { + log.Info("header", zap.String("key", k)) + } + log.Info("component", zap.String("component", component)) + c.Assert(component, Equals, "pdctl") + fmt.Fprint(w, component) + }) + info := server.ServiceGroup{ + IsCore: true, + } + return mux, info, nil + } + cfg := server.NewTestSingleConfig(checkerWithNilAssert(c)) + ctx, cancel := context.WithCancel(context.Background()) + svr, err := server.CreateServer(ctx, cfg, handler) + c.Assert(err, IsNil) + err = svr.Run() + c.Assert(err, IsNil) + pdAddr := svr.GetAddr() + defer func() { + cancel() + svr.Close() + testutil.CleanServer(svr.GetConfig().DataDir) + }() + + cmd := cmd.GetRootCmd() + args := []string{"-u", pdAddr, "health"} + output, err := ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(string(output), Equals, "pdctl\n") +} diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index 66e675edb7f7..75e7d19c288b 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/assertutil" "github.com/tikv/pd/server" "github.com/tikv/pd/server/api" "github.com/tikv/pd/server/core" @@ -113,3 +114,11 @@ func MustPutRegion(c *check.C, cluster *tests.TestCluster, regionID, storeID uin c.Assert(err, check.IsNil) return r } + +func checkerWithNilAssert(c *check.C) *assertutil.Checker { + checker := assertutil.NewChecker(c.FailNow) + checker.IsNil = func(obtained interface{}) { + c.Assert(obtained, check.IsNil) + } + return checker +} diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index bc6dcaa82eac..8f679daf5169 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -25,11 +25,15 @@ import ( "github.com/pingcap/errors" "github.com/spf13/cobra" + "github.com/tikv/pd/pkg/apiutil" "go.etcd.io/etcd/pkg/transport" ) var ( - dialClient = &http.Client{} + pdControllerComponentName = "pdctl" + dialClient = &http.Client{ + Transport: apiutil.NewComponentSignatureRoundTripper(http.DefaultTransport, pdControllerComponentName), + } pingPrefix = "pd/api/v1/ping" ) @@ -46,9 +50,8 @@ func InitHTTPSClient(caPath, certPath, keyPath string) error { } dialClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - }, + Transport: apiutil.NewComponentSignatureRoundTripper( + &http.Transport{TLSClientConfig: tlsConfig}, pdControllerComponentName), } return nil From 53ca795455635cef34205de8f537d0814db38b68 Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 29 Dec 2021 19:25:50 +0800 Subject: [PATCH 5/6] config: fix the display value of the log level (#4519) Signed-off-by: nolouch --- server/config/config.go | 2 +- server/config/config_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/config/config.go b/server/config/config.go index 930d459b0f34..74f1f8c05fa3 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -187,7 +187,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.Metric.PushAddress, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push") - fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')") + fs.StringVar(&cfg.Log.Level, "L", "info", "log level: debug, info, warn, error, fatal (default 'info')") fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path") fs.StringVar(&cfg.Security.CAPath, "cacert", "", "path of file that contains list of trusted TLS CAs") diff --git a/server/config/config_test.go b/server/config/config_test.go index a0353828689c..1b4ec18b2f4f 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -194,6 +194,7 @@ leader-schedule-limit = 0 c.Assert(cfg.Schedule.LeaderScheduleLimit, Equals, uint64(0)) // When undefined, use default values. c.Assert(cfg.PreVote, IsTrue) + c.Assert(cfg.Log.Level, Equals, "info") c.Assert(cfg.Schedule.MaxMergeRegionKeys, Equals, uint64(defaultMaxMergeRegionKeys)) c.Assert(cfg.PDServerCfg.MetricStorage, Equals, "http://127.0.0.1:9090") From ae23d409c528a836ae6d98cd174689c38ad19f2d Mon Sep 17 00:00:00 2001 From: ShuNing Date: Wed, 29 Dec 2021 20:19:50 +0800 Subject: [PATCH 6/6] config: enable record history hot region (#4521) * config: enable record history hot region Signed-off-by: nolouch * add test Signed-off-by: nolouch Co-authored-by: Ti Chi Robot --- server/config/config.go | 2 +- server/config/config_test.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 74f1f8c05fa3..0ce34cc42e32 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -786,7 +786,7 @@ const ( defaultEnableJointConsensus = true defaultEnableCrossTableMerge = true defaultHotRegionsWriteInterval = 10 * time.Minute - defaultHotRegionsReservedDays = 0 + defaultHotRegionsReservedDays = 7 ) func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error { diff --git a/server/config/config_test.go b/server/config/config_test.go index 1b4ec18b2f4f..cb3c29eb3f39 100644 --- a/server/config/config_test.go +++ b/server/config/config_test.go @@ -465,7 +465,7 @@ wait-store-timeout = "120s" c.Assert(cfg.ReplicationMode.ReplicationMode, Equals, "majority") } -func (s *testConfigSuite) TestHotRegionConfig(c *C) { +func (s *testConfigSuite) TestHotHistoryRegionConfig(c *C) { cfgData := ` [schedule] hot-regions-reserved-days= 30 @@ -476,8 +476,14 @@ hot-regions-write-interval= "30m" c.Assert(err, IsNil) err = cfg.Adjust(&meta, false) c.Assert(err, IsNil) - c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, time.Minute*30) + c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 30*time.Minute) c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(30)) + // Verify default value + cfg = NewConfig() + err = cfg.Adjust(nil, false) + c.Assert(err, IsNil) + c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 10*time.Minute) + c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(7)) } func (s *testConfigSuite) TestConfigClone(c *C) {