Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into support-dynamic-distro
Browse files Browse the repository at this point in the history
  • Loading branch information
baurine committed Dec 30, 2021
2 parents 45046e0 + ae23d40 commit f16a8ce
Show file tree
Hide file tree
Showing 22 changed files with 431 additions and 256 deletions.
39 changes: 39 additions & 0 deletions pkg/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ import (
"github.com/unrolled/render"
)

var (
// componentSignatureKey is used for http request header key
// to identify component signature
componentSignatureKey = "component"
// componentAnonymousValue identifies anonymous request source
componentAnonymousValue = "anonymous"
)

// DeferClose captures the error returned from closing (if an error occurs).
// This is designed to be used in a defer statement.
func DeferClose(c io.Closer, err *error) {
Expand Down Expand Up @@ -127,3 +135,34 @@ func ErrorResp(rd *render.Render, w http.ResponseWriter, err error) {
rd.JSON(w, http.StatusInternalServerError, err.Error())
}
}

// GetComponentNameOnHTTP returns component name from Request Header
func GetComponentNameOnHTTP(r *http.Request) string {
componentName := r.Header.Get(componentSignatureKey)
if len(componentName) == 0 {
componentName = componentAnonymousValue
}
return componentName
}

// ComponentSignatureRoundTripper is used to add component signature in HTTP header
type ComponentSignatureRoundTripper struct {
proxied http.RoundTripper
component string
}

// NewComponentSignatureRoundTripper returns a new ComponentSignatureRoundTripper.
func NewComponentSignatureRoundTripper(roundTripper http.RoundTripper, componentName string) *ComponentSignatureRoundTripper {
return &ComponentSignatureRoundTripper{
proxied: roundTripper,
component: componentName,
}
}

// RoundTrip is used to implement RoundTripper
func (rt *ComponentSignatureRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
req.Header.Add(componentSignatureKey, rt.component)
// Send the request, get the response and the error
resp, err = rt.proxied.RoundTrip(req)
return
}
15 changes: 9 additions & 6 deletions pkg/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
"io"
"net/http"
"net/url"
"strings"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/urfave/negroni"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -113,11 +111,16 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
http.Error(w, "no leader", http.StatusServiceUnavailable)
return
}
clientUrls := leader.GetClientUrls()
urls := make([]url.URL, 0, len(clientUrls))
for _, item := range clientUrls {
u, err := url.Parse(item)
if err != nil {
http.Error(w, errs.ErrURLParse.Wrap(err).GenWithStackByCause().Error(), http.StatusInternalServerError)
return
}

urls, err := config.ParseUrls(strings.Join(leader.GetClientUrls(), ","))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
urls = append(urls, *u)
}
client := h.s.GetHTTPClient()
NewCustomReverseProxies(client, urls).ServeHTTP(w, r)
Expand Down
10 changes: 9 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (mc *Cluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {

// HotRegionsFromStore picks hot regions in specify store.
func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*core.RegionInfo {
stats := mc.HotCache.HotRegionsFromStore(store, kind, mc.GetHotRegionCacheHitsThreshold())
stats := hotRegionsFromStore(mc.HotCache, store, kind, mc.GetHotRegionCacheHitsThreshold())
regions := make([]*core.RegionInfo, 0, len(stats))
for _, stat := range stats {
region := mc.GetRegion(stat.RegionID)
Expand All @@ -141,6 +141,14 @@ func (mc *Cluster) HotRegionsFromStore(store uint64, kind statistics.RWType) []*
return regions
}

// hotRegionsFromStore picks hot region in specify store.
func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics.RWType, minHotDegree int) []*statistics.HotPeerStat {
if stats, ok := w.RegionStats(kind, minHotDegree)[storeID]; ok && len(stats) > 0 {
return stats
}
return nil
}

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.AllocID()
Expand Down
1 change: 1 addition & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ func (s *testGetRegionRangeHolesSuite) TestRegionRangeHoles(c *C) {
{"", core.HexRegionKeyStr(r1.GetStartKey())},
{core.HexRegionKeyStr(r1.GetEndKey()), core.HexRegionKeyStr(r3.GetStartKey())},
{core.HexRegionKeyStr(r4.GetEndKey()), core.HexRegionKeyStr(r6.GetStartKey())},
{core.HexRegionKeyStr(r6.GetEndKey()), ""},
})
}

Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.limiter.Collect(newStore.GetStoreStats())
}

regionIDs := make(map[uint64]struct{}, len(stats.GetPeerStats()))
regions := make(map[uint64]*core.RegionInfo, len(stats.GetPeerStats()))
for _, peerStat := range stats.GetPeerStats() {
regionID := peerStat.GetRegionId()
regionIDs[regionID] = struct{}{}
region := c.GetRegion(regionID)
regions[regionID] = region
if region == nil {
log.Warn("discard hot peer stat for unknown region",
zap.Uint64("region-id", regionID),
Expand All @@ -624,7 +624,7 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
peerInfo := core.NewPeerInfo(peer, loads, interval)
c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region))
}
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regionIDs, interval))
c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval))
return nil
}

Expand Down
29 changes: 6 additions & 23 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func NewConfig() *Config {

fs.StringVar(&cfg.Metric.PushAddress, "metrics-addr", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")

fs.StringVar(&cfg.Log.Level, "L", "", "log level: debug, info, warn, error, fatal (default 'info')")
fs.StringVar(&cfg.Log.Level, "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path")

fs.StringVar(&cfg.Security.CAPath, "cacert", "", "path of file that contains list of trusted TLS CAs")
Expand Down Expand Up @@ -786,7 +786,7 @@ const (
defaultEnableJointConsensus = true
defaultEnableCrossTableMerge = true
defaultHotRegionsWriteInterval = 10 * time.Minute
defaultHotRegionsReservedDays = 0
defaultHotRegionsReservedDays = 7
)

func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
Expand Down Expand Up @@ -1199,23 +1199,6 @@ func (c LabelPropertyConfig) Clone() LabelPropertyConfig {
return m
}

// ParseUrls parse a string into multiple urls.
// Export for api.
func ParseUrls(s string) ([]url.URL, error) {
items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

urls = append(urls, *u)
}

return urls, nil
}

// SetupLogger setup the logger.
func (c *Config) SetupLogger() error {
lg, p, err := log.InitLogger(&c.Log, zap.AddStacktrace(zapcore.FatalLevel))
Expand Down Expand Up @@ -1283,22 +1266,22 @@ func (c *Config) GenEmbedEtcdConfig() (*embed.Config, error) {
cfg.Logger = "zap"
var err error

cfg.LPUrls, err = ParseUrls(c.PeerUrls)
cfg.LPUrls, err = parseUrls(c.PeerUrls)
if err != nil {
return nil, err
}

cfg.APUrls, err = ParseUrls(c.AdvertisePeerUrls)
cfg.APUrls, err = parseUrls(c.AdvertisePeerUrls)
if err != nil {
return nil, err
}

cfg.LCUrls, err = ParseUrls(c.ClientUrls)
cfg.LCUrls, err = parseUrls(c.ClientUrls)
if err != nil {
return nil, err
}

cfg.ACUrls, err = ParseUrls(c.AdvertiseClientUrls)
cfg.ACUrls, err = parseUrls(c.AdvertiseClientUrls)
if err != nil {
return nil, err
}
Expand Down
11 changes: 9 additions & 2 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ leader-schedule-limit = 0
c.Assert(cfg.Schedule.LeaderScheduleLimit, Equals, uint64(0))
// When undefined, use default values.
c.Assert(cfg.PreVote, IsTrue)
c.Assert(cfg.Log.Level, Equals, "info")
c.Assert(cfg.Schedule.MaxMergeRegionKeys, Equals, uint64(defaultMaxMergeRegionKeys))
c.Assert(cfg.PDServerCfg.MetricStorage, Equals, "http://127.0.0.1:9090")

Expand Down Expand Up @@ -464,7 +465,7 @@ wait-store-timeout = "120s"
c.Assert(cfg.ReplicationMode.ReplicationMode, Equals, "majority")
}

func (s *testConfigSuite) TestHotRegionConfig(c *C) {
func (s *testConfigSuite) TestHotHistoryRegionConfig(c *C) {
cfgData := `
[schedule]
hot-regions-reserved-days= 30
Expand All @@ -475,8 +476,14 @@ hot-regions-write-interval= "30m"
c.Assert(err, IsNil)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, time.Minute*30)
c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 30*time.Minute)
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(30))
// Verify default value
cfg = NewConfig()
err = cfg.Adjust(nil, false)
c.Assert(err, IsNil)
c.Assert(cfg.Schedule.HotRegionsWriteInterval.Duration, Equals, 10*time.Minute)
c.Assert(cfg.Schedule.HotRegionsReservedDays, Equals, int64(7))
}

func (s *testConfigSuite) TestConfigClone(c *C) {
Expand Down
18 changes: 18 additions & 0 deletions server/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package config
import (
"net/url"
"regexp"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/errs"
)

const (
Expand Down Expand Up @@ -87,3 +89,19 @@ func NewTestOptions() *PersistOptions {
c.Adjust(nil, false)
return NewPersistOptions(c)
}

// parseUrls parse a string into multiple urls.
func parseUrls(s string) ([]url.URL, error) {
items := strings.Split(s, ",")
urls := make([]url.URL, 0, len(items))
for _, item := range items {
u, err := url.Parse(item)
if err != nil {
return nil, errs.ErrURLParse.Wrap(err).GenWithStackByCause()
}

urls = append(urls, *u)
}

return urls, nil
}
4 changes: 4 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,10 @@ func (r *RegionsInfo) GetRangeHoles() [][]string {
lastEndKey = region.GetEndKey()
return true
})
// If the last end key is not empty, it means there is a range hole at the end.
if len(lastEndKey) > 0 {
rangeHoles = append(rangeHoles, []string{HexRegionKeyStr(lastEndKey), ""})
}
return rangeHoles
}

Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func (h *Handler) packHotRegions(hotPeersStat statistics.StoreHotPeersStat, hotR
}
}
stat := core.HistoryHotRegion{
// store in ms.
// store in ms.
UpdateTime: hotPeerStat.LastUpdateTime.UnixNano() / int64(time.Millisecond),
RegionID: hotPeerStat.RegionID,
StoreID: hotPeerStat.StoreID,
Expand Down
6 changes: 3 additions & 3 deletions server/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
c.Check(len(items), Greater, 0)
for _, item := range items {
if item.StoreID == 3 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
continue
}
c.Check(item.HotDegree, Equals, testcase.DegreeAfterTransferLeader+2)
Expand Down Expand Up @@ -1586,9 +1586,9 @@ func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.WriteReportInterval, 0, 0, statistics.WriteReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
c.Check(item.GetActionType(), Equals, statistics.Remove)
} else {
c.Check(item.IsNeedDelete(), IsFalse)
c.Check(item.GetActionType(), Equals, statistics.Update)
}
}
}
Expand Down
Loading

0 comments on commit f16a8ce

Please sign in to comment.