From 06665cf7ea862dc267064e233310e474ba4851fa Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 29 Jul 2022 17:05:23 +0800 Subject: [PATCH 1/2] reparo support pprof --- reparo/config.go | 2 + reparo/decode.go | 1 + reparo/http_handler.go | 69 ++++++++++++++++++++++++++++++++++ reparo/metrics.go | 73 ++++++++++++++++++++++++++++++++++++ reparo/reparo.go | 20 +++++++++- reparo/syncer/mysql.go | 12 ++++-- reparo/syncer/mysql_test.go | 4 +- reparo/syncer/syncer.go | 5 ++- reparo/syncer/syncer_test.go | 2 +- 9 files changed, 177 insertions(+), 11 deletions(-) create mode 100644 reparo/http_handler.go create mode 100644 reparo/metrics.go diff --git a/reparo/config.go b/reparo/config.go index 32a78a410..f3f57c6ea 100644 --- a/reparo/config.go +++ b/reparo/config.go @@ -46,6 +46,7 @@ type Config struct { StopTSO int64 `toml:"stop-tso" json:"stop-tso"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` WorkerCount int `toml:"worker-count" json:"worker-count"` + StatusAddr string `toml:"status-addr" json:"status-addr"` DestType string `toml:"dest-type" json:"dest-type"` DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"` @@ -77,6 +78,7 @@ func NewConfig() *Config { fs.StringVar(&c.Dir, "data-dir", "", "drainer data directory path") fs.StringVar(&c.StartDatetime, "start-datetime", "", "recovery from start-datetime, empty string means starting from the beginning of the first file") fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.") + fs.StringVar(&c.StatusAddr, "status-addr", ":8381", "reparo API server and pprof addr") fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format") fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format") fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") diff --git a/reparo/decode.go b/reparo/decode.go index 9763bd4d6..3fcb2b558 100644 --- a/reparo/decode.go +++ b/reparo/decode.go @@ -28,6 +28,7 @@ func Decode(r io.Reader) (*pb.Binlog, int64, error) { if err != nil { return nil, 0, errors.Trace(err) } + readBinlogSizeHistogram.Observe(float64(length)) binlog := &pb.Binlog{} err = binlog.Unmarshal(payload) diff --git a/reparo/http_handler.go b/reparo/http_handler.go new file mode 100644 index 000000000..9c435ea0f --- /dev/null +++ b/reparo/http_handler.go @@ -0,0 +1,69 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package reparo + +import ( + "net" + "net/http" + "net/http/pprof" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/soheilhy/cmux" + "go.uber.org/zap" +) + +var cmuxReadTimeout = 10 * time.Second + +func startHTTPServer(lis net.Listener) { + router := http.NewServeMux() + router.Handle("/metrics", promhttp.Handler()) + + router.HandleFunc("/debug/pprof/", pprof.Index) + router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + router.HandleFunc("/debug/pprof/profile", pprof.Profile) + router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + router.HandleFunc("/debug/pprof/trace", pprof.Trace) + + httpServer := &http.Server{ + Handler: router, + } + err := httpServer.Serve(lis) + err = errors.Cause(err) + if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed { + log.Info("reparo http handler return with error", zap.String("error", err.Error())) + } +} + +func startReparoService(addr string) error { + rootLis, err := net.Listen("tcp", addr) + if err != nil { + return errors.Annotate(err, "start listening") + } + + // create a cmux + m := cmux.New(rootLis) + m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352 + + httpL := m.Match(cmux.HTTP1Fast()) + go startHTTPServer(httpL) + + err = m.Serve() // start serving, block + if err != nil && isErrNetClosing(err) { + err = nil + } + return err +} + +var useOfClosedErrMsg = "use of closed network connection" + +// isErrNetClosing checks whether is an ErrNetClosing error +func isErrNetClosing(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), useOfClosedErrMsg) +} diff --git a/reparo/metrics.go b/reparo/metrics.go new file mode 100644 index 000000000..08abbbe62 --- /dev/null +++ b/reparo/metrics.go @@ -0,0 +1,73 @@ +package reparo + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + eventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "event", + Help: "the count of sql event(dml, ddl).", + }, []string{"type"}) + + checkpointTSOGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "checkpoint_tso", + Help: "save checkpoint tso of drainer.", + }) + executeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "execute_duration_time", + Help: "Bucketed histogram of processing time (s) of a execute to sync data to downstream.", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), + }) + + queryHistogramVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "query_duration_time", + Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), + }, []string{"type"}) + + readBinlogSizeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "read_binlog_size", + Help: "Bucketed histogram of size of a binlog.", + Buckets: prometheus.ExponentialBuckets(16, 2, 25), + }) + + queueSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "queue_size", + Help: "the size of queue", + }, []string{"name"}) +) + +func init() { + registry := prometheus.DefaultRegisterer + registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + registry.MustRegister(prometheus.NewGoCollector()) + registry.MustRegister(checkpointTSOGauge) + registry.MustRegister(eventCounter) + registry.MustRegister(executeHistogram) + registry.MustRegister(readBinlogSizeHistogram) + registry.MustRegister(queryHistogramVec) + registry.MustRegister(queueSizeGauge) + + if gatherer, ok := registry.(prometheus.Gatherer); ok { + prometheus.DefaultGatherer = gatherer + } +} diff --git a/reparo/reparo.go b/reparo/reparo.go index 5600b1664..fbe0fba36 100644 --- a/reparo/reparo.go +++ b/reparo/reparo.go @@ -15,10 +15,12 @@ package reparo import ( "io" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/filter" + "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb-binlog/reparo/syncer" "github.com/pingcap/tidb/store/tikv/oracle" @@ -37,7 +39,11 @@ type Reparo struct { func New(cfg *Config) (*Reparo, error) { log.Info("New Reparo", zap.Stringer("config", cfg)) - syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode) + syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode, &loader.MetricsGroup{ + EventCounterVec: eventCounter, + QueryHistogramVec: queryHistogramVec, + QueueSizeGauge: queueSizeGauge, + }) if err != nil { return nil, errors.Trace(err) } @@ -53,6 +59,14 @@ func New(cfg *Config) (*Reparo, error) { // Process runs the main procedure. func (r *Reparo) Process() error { + if r.cfg.StatusAddr != "" { + go func() { + err := startReparoService(r.cfg.StatusAddr) + if err != nil { + log.Info("meet error when stopping reparo http service", zap.String("error", err.Error())) + } + }() + } pbReader, err := newDirPbReader(r.cfg.Dir, r.cfg.StartTSO, r.cfg.StopTSO) if err != nil { return errors.Annotatef(err, "new reader failed dir: %s", r.cfg.Dir) @@ -77,10 +91,12 @@ func (r *Reparo) Process() error { if ignore { continue } - + beginTime := time.Now() err = r.syncer.Sync(binlog, func(binlog *pb.Binlog) { dt := oracle.GetTimeFromTS(uint64(binlog.CommitTs)) log.Info("sync binlog success", zap.Int64("ts", binlog.CommitTs), zap.Time("datetime", dt)) + checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(binlog.CommitTs)))) + executeHistogram.Observe(time.Since(beginTime).Seconds()) }) if err != nil { diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index f318eef96..c2c04866a 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -49,17 +49,21 @@ var ( // should be only used for unit test to create mock db var createDB = loader.CreateDB -func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { +func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) { db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, nil) if err != nil { return nil, errors.Trace(err) } - return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode) + return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode, metricsGroup) } -func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { - loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize)) +func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) { + opts := []loader.Option{loader.WorkerCount(worker), loader.BatchSize(batchSize)} + if metricsGroup != nil { + opts = append(opts, loader.Metrics(metricsGroup)) + } + loader, err := loader.NewLoader(db, opts...) if err != nil { return nil, errors.Annotate(err, "new loader failed") } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 3c93d87ea..a49641a0b 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -5,7 +5,7 @@ import ( "database/sql" "time" - sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -33,7 +33,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { createDB = oldCreateDB }() - syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode) + syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode, nil) c.Assert(err, check.IsNil) mock.ExpectBegin() diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index 8e1a1810d..e679c1ce8 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -16,6 +16,7 @@ package syncer import ( "fmt" + "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -29,10 +30,10 @@ type Syncer interface { } // New creates a new executor based on the name. -func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) { +func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (Syncer, error) { switch name { case "mysql": - return newMysqlSyncer(cfg, worker, batchSize, safemode) + return newMysqlSyncer(cfg, worker, batchSize, safemode, metricsGroup) case "print": return newPrintSyncer() case "memory": diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 4ebd8c831..395a30c1b 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { } for _, testCase := range testCases { - syncer, err := New(testCase.typeStr, cfg, 16, 20, false) + syncer, err := New(testCase.typeStr, cfg, 16, 20, false, nil) c.Assert(err, check.IsNil) c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) } From 000eea4fe24b99f70bafc86f4a7b070389e19399 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 29 Jul 2022 18:10:49 +0800 Subject: [PATCH 2/2] fix --- reparo/metrics.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/reparo/metrics.go b/reparo/metrics.go index 08abbbe62..c169f4c9e 100644 --- a/reparo/metrics.go +++ b/reparo/metrics.go @@ -57,7 +57,7 @@ var ( ) func init() { - registry := prometheus.DefaultRegisterer + registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(checkpointTSOGauge) @@ -67,7 +67,5 @@ func init() { registry.MustRegister(queryHistogramVec) registry.MustRegister(queueSizeGauge) - if gatherer, ok := registry.(prometheus.Gatherer); ok { - prometheus.DefaultGatherer = gatherer - } + prometheus.DefaultGatherer = registry }