Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support dumping metrics by instance #470

Merged
merged 3 commits into from
May 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 104 additions & 58 deletions collector/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"maps"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -320,7 +321,7 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err

tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd)
tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin)
collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader)
collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, "")

mu.Lock()
done++
Expand All @@ -341,55 +342,59 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err
return nil
}

func getMetricList(c *http.Client, prom string, customHeader []string) ([]string, error) {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf("http://%s/api/v1/label/__name__/values", prom),
nil)
if err != nil {
return nil, err
}
utils.AddHeaders(req.Header, customHeader)
resp, err := c.Do(req)
if err != nil {
return []string{}, err
}
defer resp.Body.Close()
func getMetricList(c *http.Client, addr string, customHeader []string) ([]string, error) {
return getAPIData[[]string](c, makeURL(addr, "/api/v1/label/__name__/values", nil), customHeader)
}

r := struct {
Metrics []string `json:"data"`
}{}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return []string{}, err
}
return r.Metrics, nil
func getInstanceList(c *http.Client, addr string, queries map[string]string, customHeader []string) ([]string, error) {
return getAPIData[[]string](c, makeURL(addr, "/api/v1/label/instance/values", queries), customHeader)
}

func getSeriesNum(c *http.Client, promAddr, query string, customHeader []string) (int, error) {
req, err := http.NewRequest(
http.MethodGet,
fmt.Sprintf("http://%s/api/v1/series?match[]=%s", promAddr, query),
nil)
func getSeriesNum(c *http.Client, addr string, queries map[string]string, customHeader []string) (int, error) {
series, err := getAPIData[[]interface{}](c, makeURL(addr, "/api/v1/series", queries), customHeader)
if err != nil {
return 0, err
}
return len(series), nil
}

func getAPIData[T any](c *http.Client, url string, customHeader []string) (T, error) {
var body struct {
Data T `json:"data"`
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return body.Data, err
}
utils.AddHeaders(req.Header, customHeader)
resp, err := c.Do(req)
if err != nil {
return 0, err
return body.Data, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
return 0, fmt.Errorf("Status Code %d", resp.StatusCode)
msg, err := io.ReadAll(resp.Body)
if err != nil {
return body.Data, fmt.Errorf("[%d] failed read body: %v", resp.StatusCode, err)
}
return body.Data, fmt.Errorf("[%d] %s", resp.StatusCode, string(msg))
}
defer resp.Body.Close()
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
return body.Data, err
}
return body.Data, nil
}

r := struct {
Series []interface{} `json:"data"`
}{}
if err := json.NewDecoder(resp.Body).Decode(&r); err != nil {
return 0, err
func makeURL(addr string, path string, queries map[string]string) string {
link := "http://" + addr + path
if len(queries) == 0 {
return link
}
return len(r.Series), nil
vals := make(url.Values, len(queries))
for k, v := range queries {
vals[k] = []string{v}
}
return link + "?" + vals.Encode()
}

func collectMetric(
Expand All @@ -403,14 +408,27 @@ func collectMetric(
speedlimit int,
compress bool,
customHeader []string,
instance string,
) {
nameSuffix := ""
if len(instance) > 0 {
nameSuffix = "." + strings.ReplaceAll(instance, ":", "-")
}
query := generateQueryWitLabel(mtc, label)
l.Debugf("Querying series of %s...", mtc)
queries := map[string]string{
"match[]": query,
"start": beginTime.Format(time.RFC3339),
"end": endTime.Format(time.RFC3339),
}
l.Debugf("Querying series of %s...", mtc+nameSuffix)

var series int
if err := tiuputils.Retry(
func() error {
seriesNum, err := getSeriesNum(c, promAddr, query, customHeader)
// if len(instance) == 0 && rand.Float64() < 0.9 {
// return errors.New("mock error")
// }
seriesNum, err := getSeriesNum(c, promAddr, queries, customHeader)
series = seriesNum
return err
},
Expand All @@ -420,12 +438,41 @@ func collectMetric(
Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout
},
); err != nil {
l.Errorf("Failed to get series of %s: %s", mtc, err)
l.Errorf("Failed to get series of %s: %s", mtc+nameSuffix, err)
if len(instance) == 0 {
// try by-instance dumping
var instances []string
if err := tiuputils.Retry(
func() error {
instanceLst, err := getInstanceList(c, promAddr, queries, customHeader)
instances = instanceLst
return err
},
tiuputils.RetryOption{
Attempts: 3,
Delay: time.Microsecond * 300,
Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout
},
); err != nil {
l.Errorf("Failed to get instances of %s: %s", mtc, err)
return
}
if len(instances) == 0 {
l.Warnf("No instance found for %s", mtc)
return
}
for _, instance := range instances {
newLabel := make(map[string]string)
maps.Copy(newLabel, label)
newLabel["instance"] = instance
collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, compress, customHeader, instance)
}
}
return
}

if series <= 0 {
l.Debugf("metric %s has %d series, ignore", mtc, series)
l.Debugf("metric %s has %d series, ignore", mtc+nameSuffix, series)
return
}

Expand All @@ -441,7 +488,7 @@ func collectMetric(
block = minQueryRange
}

l.Debugf("Dumping metric %s-%s-%s...", mtc, beginTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
l.Debugf("Dumping metric %s-%s-%s%s...", mtc, beginTime.Format(time.RFC3339), endTime.Format(time.RFC3339), nameSuffix)
for queryEnd := endTime; queryEnd.After(beginTime); queryEnd = queryEnd.Add(time.Duration(-block) * time.Second) {
querySec := block
queryBegin := queryEnd.Add(time.Duration(-block) * time.Second)
Expand All @@ -464,23 +511,23 @@ func collectMetric(
utils.AddHeaders(req.Header, customHeader)
resp, err := c.Do(req)
if err != nil {
l.Errorf("failed query metric %s: %s, retry...", mtc, err)
l.Errorf("failed query metric %s: %s, retry...", mtc+nameSuffix, err)
return err
}
// Prometheus API response format is JSON. Every successful API request returns a 2xx status code.
if resp.StatusCode/100 != 2 {
l.Errorf("failed query metric %s: Status Code %d, retry...", mtc, resp.StatusCode)
l.Errorf("failed query metric %s: Status Code %d, retry...", mtc+nameSuffix, resp.StatusCode)
}
defer resp.Body.Close()

dst, err := os.Create(
filepath.Join(
resultDir, subdirMonitor, subdirMetrics, strings.ReplaceAll(promAddr, ":", "-"),
fmt.Sprintf("%s_%s_%s.json", mtc, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339)),
fmt.Sprintf("%s-%s-%s%s.json", mtc, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339), nameSuffix),
),
)
if err != nil {
l.Errorf("collect metric %s: %s, retry...", mtc, err)
l.Errorf("collect metric %s: %s, retry...", mtc+nameSuffix, err)
}
defer dst.Close()

Expand All @@ -490,7 +537,7 @@ func collectMetric(
// compress the metric
enc, err = zstd.NewWriter(dst)
if err != nil {
l.Errorf("failed compressing metric %s: %s, retry...\n", mtc, err)
l.Errorf("failed compressing metric %s: %s, retry...\n", mtc+nameSuffix, err)
return err
}
defer enc.Close()
Expand All @@ -499,10 +546,10 @@ func collectMetric(
}
n, err = io.Copy(enc, resp.Body)
if err != nil {
l.Errorf("failed writing metric %s to file: %s, retry...\n", mtc, err)
l.Errorf("failed writing metric %s to file: %s, retry...\n", mtc+nameSuffix, err)
return err
}
l.Debugf(" Dumped metric %s from %s to %s (%d bytes)", mtc, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339), n)
l.Debugf(" Dumped metric %s from %s to %s (%d bytes)", mtc+nameSuffix, queryBegin.Format(time.RFC3339), queryEnd.Format(time.RFC3339), n)
return nil
},
tiuputils.RetryOption{
Expand All @@ -511,7 +558,7 @@ func collectMetric(
Timeout: c.Timeout*3 + 5*time.Second, //make sure the retry timeout is longer than the api timeout
},
); err != nil {
l.Errorf("Error quering metrics %s: %s", mtc, err)
l.Errorf("Error quering metrics %s: %s", mtc+nameSuffix, err)
}
}
}
Expand Down Expand Up @@ -539,15 +586,14 @@ func filterMetrics(src, filter []string) []string {
}

func generateQueryWitLabel(metric string, labels map[string]string) string {
query := metric
if len(labels) > 0 {
query += "{"
for k, v := range labels {
query = fmt.Sprintf("%s%s=\"%s\",", query, k, v)
}
query = query[:len(query)-1] + "}"
}
return query
buf := new(strings.Builder)
buf.WriteString("{")
fmt.Fprintf(buf, "__name__=%q", metric)
for name, value := range labels {
fmt.Fprintf(buf, ",%s=%q", name, value)
}
buf.WriteString("}")
return buf.String()
}

// TSDBCollectOptions is the options collecting TSDB file of prometheus, only work for tiup-cluster deployed cluster
Expand Down
Loading