Skip to content

Commit

Permalink
Add collect tsdb (#429)
Browse files Browse the repository at this point in the history
* support collect raw prometheus data

* only collect from first prometheus

* ignore chunks_head when not need

* fix

* update tiup
  • Loading branch information
nexustar authored Feb 15, 2023
1 parent b05d398 commit c0cacb0
Show file tree
Hide file tree
Showing 11 changed files with 1,030 additions and 43 deletions.
1 change: 1 addition & 0 deletions cmd/diag/command/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func newCollectCmd() *cobra.Command {
cmd.Flags().BoolVar(&cOpt.CompressScp, "compress-scp", true, "Compress when transfer config and logs.Only works with system ssh")
cmd.Flags().BoolVar(&cOpt.CompressMetrics, "compress-metrics", true, "Compress collected metrics data.")
cmd.Flags().BoolVar(&cOpt.ExitOnError, "exit-on-error", false, "Stop collecting and exit if an error occurs.")
cmd.Flags().BoolVar(&cOpt.RawMonitor, "raw-monitor", false, "Collect raw prometheus data")
cmd.Flags().StringVar(&cOpt.ExplainSQLPath, "explain-sql", "", "File path for explain sql")
cmd.Flags().StringVar(&cOpt.CurrDB, "db", "", "default db for plan replayer collector")

Expand Down
1 change: 1 addition & 0 deletions cmd/diag/command/collectdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func newCollectDMCmd() *cobra.Command {
cmd.Flags().BoolVar(&cOpt.CompressScp, "compress-scp", true, "Compress when transfer config and logs.Only works with system ssh")
cmd.Flags().BoolVar(&cOpt.CompressMetrics, "compress-metrics", true, "Compress collected metrics data.")
cmd.Flags().BoolVar(&cOpt.ExitOnError, "exit-on-error", false, "Stop collecting and exit if an error occurs.")
cmd.Flags().BoolVar(&cOpt.RawMonitor, "raw-monitor", false, "Collect raw prometheus data")

return cmd
}
2 changes: 2 additions & 0 deletions cmd/scraper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func init() {
rootCmd.Flags().StringSliceVar(&logtype, "logtype", []string{"std", "slow"}, "type of log files to scrap")
rootCmd.Flags().StringSliceVar(&opt.ConfigPaths, "config", nil, "paths of config files to scrap")
rootCmd.Flags().StringSliceVar(&opt.FilePaths, "file", nil, "paths of normal files to scrap")
rootCmd.Flags().StringVar(&opt.PrometheusDataDir, "prometheus", "", "paths of prometheus datadir")
rootCmd.Flags().StringVarP(&opt.Start, "from", "f", "", "start time of range to scrap, only apply to logs")
rootCmd.Flags().StringVarP(&opt.End, "to", "t", "", "start time of range to scrap, only apply to logs")

Expand All @@ -76,6 +77,7 @@ func Execute() {
err := rootCmd.Execute()
if err != nil {
code = 1
fmt.Println(err)
}

if code != 0 {
Expand Down
14 changes: 14 additions & 0 deletions cmd/scraper/scrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ func Scrap(opt *scraper.Option) (*scraper.Sample, error) {
scrapers = append(scrapers, s)
}

if opt.PrometheusDataDir != "" {
s := &scraper.TSDBScraper{
Paths: []string{opt.PrometheusDataDir},
}
var err error
if s.Start, err = utils.ParseTime(opt.Start); err != nil {
return nil, err
}
if s.End, err = utils.ParseTime(opt.End); err != nil {
return nil, err
}
scrapers = append(scrapers, s)
}

result := &scraper.Sample{}
for _, s := range scrapers {
if err := s.Scrap(result); err != nil {
Expand Down
15 changes: 14 additions & 1 deletion collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type CollectOptions struct {
PerfDuration int //seconds: profile time(s), default is 30s.
CompressScp bool // compress of files during collecting
CompressMetrics bool // compress of files during collecting
RawMonitor bool // collect raw data for metrics
ExitOnError bool // break the process and exit when an error occur
ExtendedAttrs map[string]string // extended attributes used for manual collecting mode
ExplainSQLPath string // File path for explain sql
Expand Down Expand Up @@ -273,7 +274,7 @@ func (m *Manager) CollectClusterInfo(
compress: cOpt.CompressMetrics,
})
}
if canCollect(&cOpt.Collectors.Monitor.Metric) {
if canCollect(&cOpt.Collectors.Monitor.Metric) && !cOpt.RawMonitor {
collectors = append(collectors,
&MetricCollectOptions{ // metrics
BaseOptions: opt,
Expand All @@ -286,6 +287,18 @@ func (m *Manager) CollectClusterInfo(
},
)
}
if canCollect(&cOpt.Collectors.Monitor.Metric) && cOpt.RawMonitor {
collectors = append(collectors,
&TSDBCollectOptions{ // metrics
BaseOptions: opt,
opt: gOpt,
resultDir: resultDir,
fileStats: make(map[string][]CollectStat),
limit: cOpt.Limit,
compress: cOpt.CompressScp,
},
)
}

// populate SSH credentials if needed
if (m.mode == CollectModeTiUP || m.mode == CollectModeManual) &&
Expand Down
6 changes: 6 additions & 0 deletions collector/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ func parseScraperSamples(ctx context.Context, host string) (map[string][]Collect
Size: v,
})
}
for k, v := range s.TSDB {
stats[host] = append(stats[host], CollectStat{
Target: k,
Size: v,
})
}

return stats, nil
}
260 changes: 260 additions & 0 deletions collector/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package collector

import (
"context"
"fmt"
"io"
"net/http"
Expand All @@ -26,12 +27,18 @@ import (
"sync"
"time"

"github.com/joomcode/errorx"
json "github.com/json-iterator/go"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/diag/pkg/models"
"github.com/pingcap/diag/pkg/utils"
perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/cluster/ctxt"
operator "github.com/pingcap/tiup/pkg/cluster/operation"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/cluster/task"
logprinter "github.com/pingcap/tiup/pkg/logger/printer"
"github.com/pingcap/tiup/pkg/set"
"github.com/pingcap/tiup/pkg/tui/progress"
tiuputils "github.com/pingcap/tiup/pkg/utils"
)
Expand All @@ -40,6 +47,7 @@ const (
subdirMonitor = "monitor"
subdirAlerts = "alerts"
subdirMetrics = "metrics"
subdirRaw = "raw"
maxQueryRange = 120 * 60 // 120min
minQueryRange = 5 * 60 // 5min
)
Expand Down Expand Up @@ -506,3 +514,255 @@ func generateQueryWitLabel(metric string, labels map[string]string) string {
}
return query
}

// TSDBCollectOptions is the options collecting TSDB file of prometheus, only work for tiup-cluster deployed cluster
type TSDBCollectOptions struct {
*BaseOptions
opt *operator.Options // global operations from cli
resultDir string
fileStats map[string][]CollectStat
compress bool
limit int
}

// Desc implements the Collector interface
func (c *TSDBCollectOptions) Desc() string {
return "metrics from Prometheus node"
}

// GetBaseOptions implements the Collector interface
func (c *TSDBCollectOptions) GetBaseOptions() *BaseOptions {
return c.BaseOptions
}

// SetBaseOptions implements the Collector interface
func (c *TSDBCollectOptions) SetBaseOptions(opt *BaseOptions) {
c.BaseOptions = opt
}

// SetGlobalOperations sets the global operation fileds
func (c *TSDBCollectOptions) SetGlobalOperations(opt *operator.Options) {
c.opt = opt
}

// SetDir sets the result directory path
func (c *TSDBCollectOptions) SetDir(dir string) {
c.resultDir = dir
}

// Prepare implements the Collector interface
func (c *TSDBCollectOptions) Prepare(m *Manager, cls *models.TiDBCluster) (map[string][]CollectStat, error) {
if m.mode != CollectModeTiUP {
return nil, nil
}
if len(cls.Monitors) < 1 {
if m.logger.GetDisplayMode() == logprinter.DisplayModeDefault {
fmt.Println("No Prometheus node found in topology, skip.")
} else {
m.logger.Warnf("No Prometheus node found in topology, skip.")
}
return nil, nil
}

// tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd)
// tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin)

uniqueHosts := map[string]int{} // host -> ssh-port
uniqueArchList := make(map[string]struct{}) // map["os-arch"]{}
hostPaths := make(map[string]set.StringSet)
hostTasks := make(map[string]*task.Builder)

topo := cls.Attributes[CollectModeTiUP].(spec.Topology)
components := topo.ComponentsByUpdateOrder()
var (
dryRunTasks []*task.StepDisplay
downloadTasks []*task.StepDisplay
)

for _, comp := range components {
if comp.Name() != spec.ComponentPrometheus {
continue
}

for _, inst := range comp.Instances() {
archKey := fmt.Sprintf("%s-%s", inst.OS(), inst.Arch())
if _, found := uniqueArchList[archKey]; !found {
uniqueArchList[archKey] = struct{}{}
t0 := task.NewBuilder(m.logger).
Download(
componentDiagCollector,
inst.OS(),
inst.Arch(),
"", // latest version
).
BuildAsStep(fmt.Sprintf(" - Downloading collecting tools for %s/%s", inst.OS(), inst.Arch()))
downloadTasks = append(downloadTasks, t0)
}

// tasks that applies to each host
if _, found := uniqueHosts[inst.GetHost()]; !found {
uniqueHosts[inst.GetHost()] = inst.GetSSHPort()
// build system info collecting tasks
t1, err := m.sshTaskBuilder(c.GetBaseOptions().Cluster, topo, c.GetBaseOptions().User, *c.opt)
if err != nil {
return nil, err
}
t1 = t1.
Mkdir(c.GetBaseOptions().User, inst.GetHost(), filepath.Join(task.CheckToolsPathDir, "bin")).
CopyComponent(
componentDiagCollector,
inst.OS(),
inst.Arch(),
"", // latest version
"", // use default srcPath
inst.GetHost(),
task.CheckToolsPathDir,
)
hostTasks[inst.GetHost()] = t1
}

// add filepaths to list
if _, found := hostPaths[inst.GetHost()]; !found {
hostPaths[inst.GetHost()] = set.NewStringSet()
}
hostPaths[inst.GetHost()].Insert(inst.DataDir())
}
}

// build scraper tasks
for h, t := range hostTasks {
host := h
t = t.
Shell(
host,
fmt.Sprintf("%s --prometheus '%s' -f '%s' -t '%s'",
filepath.Join(task.CheckToolsPathDir, "bin", "scraper"),
strings.Join(hostPaths[host].Slice(), ","),
c.ScrapeBegin, c.ScrapeEnd,
),
"",
false,
).
Func(
host,
func(ctx context.Context) error {
stats, err := parseScraperSamples(ctx, host)
if err != nil {
return err
}
for host, files := range stats {
c.fileStats[host] = files
}
return nil
},
)
t1 := t.BuildAsStep(fmt.Sprintf(" - Scraping prometheus data files on %s:%d", host, uniqueHosts[host]))
dryRunTasks = append(dryRunTasks, t1)
}

t := task.NewBuilder(m.logger).
ParallelStep("+ Download necessary tools", false, downloadTasks...).
ParallelStep("+ Collect host information", false, dryRunTasks...).
Build()

ctx := ctxt.New(
context.Background(),
c.opt.Concurrency,
m.logger,
)
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return nil, err
}
return nil, perrs.Trace(err)
}

return c.fileStats, nil
}

// Collect implements the Collector interface
func (c *TSDBCollectOptions) Collect(m *Manager, cls *models.TiDBCluster) error {
if m.mode != CollectModeTiUP {
return nil
}

topo := cls.Attributes[CollectModeTiUP].(spec.Topology)
var (
collectTasks []*task.StepDisplay
cleanTasks []*task.StepDisplay
)
uniqueHosts := map[string]int{} // host -> ssh-port

components := topo.ComponentsByUpdateOrder()

for _, comp := range components {
if comp.Name() != spec.ComponentPrometheus {
continue
}

insts := comp.Instances()
if len(insts) < 1 {
return nil
}

// only collect from first promethes
inst := insts[0]
// checks that applies to each host
if _, found := uniqueHosts[inst.GetHost()]; found {
continue
}
uniqueHosts[inst.GetHost()] = inst.GetSSHPort()

t2, err := m.sshTaskBuilder(c.GetBaseOptions().Cluster, topo, c.GetBaseOptions().User, *c.opt)
if err != nil {
return err
}
for _, f := range c.fileStats[inst.GetHost()] {
// build checking tasks
t2 = t2.
// check for listening ports
CopyFile(
f.Target,
filepath.Join(c.resultDir, subdirMonitor, subdirRaw, fmt.Sprintf("%s-%d", inst.GetHost(), inst.GetMainPort()), filepath.Base(f.Target)),
inst.GetHost(),
true,
c.limit,
c.compress,
)
}
collectTasks = append(
collectTasks,
t2.BuildAsStep(fmt.Sprintf(" - Downloading prometheus data files from node %s", inst.GetHost())),
)

b, err := m.sshTaskBuilder(c.GetBaseOptions().Cluster, topo, c.GetBaseOptions().User, *c.opt)
if err != nil {
return err
}
t3 := b.
Rmdir(inst.GetHost(), task.CheckToolsPathDir).
BuildAsStep(fmt.Sprintf(" - Cleanup temp files on %s:%d", inst.GetHost(), inst.GetSSHPort()))
cleanTasks = append(cleanTasks, t3)
}

t := task.NewBuilder(m.logger).
ParallelStep("+ Scrap files on nodes", false, collectTasks...).
ParallelStep("+ Cleanup temp files", false, cleanTasks...).
Build()

ctx := ctxt.New(
context.Background(),
c.opt.Concurrency,
m.logger,
)
if err := t.Execute(ctx); err != nil {
if errorx.Cast(err) != nil {
// FIXME: Map possible task errors and give suggestions.
return err
}
return perrs.Trace(err)
}

return nil
}
Loading

0 comments on commit c0cacb0

Please sign in to comment.