Skip to content

Commit

Permalink
reparo: support pprof and metrics (#1184)
Browse files Browse the repository at this point in the history
* reparo support pprof (#1182)

* fix prometheus.registry (#1183)

* reparo support pprof

* fix

* update go.mod

* update

* update
  • Loading branch information
lichunzhu authored Aug 1, 2022
1 parent 5c7c040 commit 361a30c
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 11 deletions.
2 changes: 2 additions & 0 deletions reparo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
StopTSO int64 `toml:"stop-tso" json:"stop-tso"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
StatusAddr string `toml:"status-addr" json:"status-addr"`

DestType string `toml:"dest-type" json:"dest-type"`
DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"`
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewConfig() *Config {
fs.StringVar(&c.Dir, "data-dir", "", "drainer data directory path")
fs.StringVar(&c.StartDatetime, "start-datetime", "", "recovery from start-datetime, empty string means starting from the beginning of the first file")
fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.")
fs.StringVar(&c.StatusAddr, "status-addr", ":8381", "reparo API server and pprof addr")
fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format")
fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format")
fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
Expand Down
1 change: 1 addition & 0 deletions reparo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Decode(r io.Reader) (*pb.Binlog, int64, error) {
if err != nil {
return nil, 0, errors.Trace(err)
}
readBinlogSizeHistogram.Observe(float64(length))

binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
Expand Down
69 changes: 69 additions & 0 deletions reparo/http_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package reparo

import (
"net"
"net/http"
"net/http/pprof"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)

var cmuxReadTimeout = 10 * time.Second

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)

httpServer := &http.Server{
Handler: router,
}
err := httpServer.Serve(lis)
err = errors.Cause(err)
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
log.Info("reparo http handler return with error", zap.String("error", err.Error()))
}
}

func startReparoService(addr string) error {
rootLis, err := net.Listen("tcp", addr)
if err != nil {
return errors.Annotate(err, "start listening")
}

// create a cmux
m := cmux.New(rootLis)
m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352

httpL := m.Match(cmux.HTTP1Fast())
go startHTTPServer(httpL)

err = m.Serve() // start serving, block
if err != nil && isErrNetClosing(err) {
err = nil
}
return err
}

var useOfClosedErrMsg = "use of closed network connection"

// isErrNetClosing checks whether is an ErrNetClosing error
func isErrNetClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), useOfClosedErrMsg)
}
71 changes: 71 additions & 0 deletions reparo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package reparo

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
eventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "event",
Help: "the count of sql event(dml, ddl).",
}, []string{"type"})

checkpointTSOGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "checkpoint_tso",
Help: "save checkpoint tso of drainer.",
})
executeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "execute_duration_time",
Help: "Bucketed histogram of processing time (s) of a execute to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
})

queryHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "query_duration_time",
Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"type"})

readBinlogSizeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "read_binlog_size",
Help: "Bucketed histogram of size of a binlog.",
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
})

queueSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "queue_size",
Help: "the size of queue",
}, []string{"name"})
)

func init() {
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(checkpointTSOGauge)
registry.MustRegister(eventCounter)
registry.MustRegister(executeHistogram)
registry.MustRegister(readBinlogSizeHistogram)
registry.MustRegister(queryHistogramVec)
registry.MustRegister(queueSizeGauge)

prometheus.DefaultGatherer = registry
}
20 changes: 18 additions & 2 deletions reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package reparo

import (
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb-binlog/reparo/syncer"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -37,7 +39,11 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Info("New Reparo", zap.Stringer("config", cfg))

syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode, &loader.MetricsGroup{
EventCounterVec: eventCounter,
QueryHistogramVec: queryHistogramVec,
QueueSizeGauge: queueSizeGauge,
})
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -53,6 +59,14 @@ func New(cfg *Config) (*Reparo, error) {

// Process runs the main procedure.
func (r *Reparo) Process() error {
if r.cfg.StatusAddr != "" {
go func() {
err := startReparoService(r.cfg.StatusAddr)
if err != nil {
log.Info("meet error when stopping reparo http service", zap.String("error", err.Error()))
}
}()
}
pbReader, err := newDirPbReader(r.cfg.Dir, r.cfg.StartTSO, r.cfg.StopTSO)
if err != nil {
return errors.Annotatef(err, "new reader failed dir: %s", r.cfg.Dir)
Expand All @@ -77,10 +91,12 @@ func (r *Reparo) Process() error {
if ignore {
continue
}

beginTime := time.Now()
err = r.syncer.Sync(binlog, func(binlog *pb.Binlog) {
dt := oracle.GetTimeFromTS(uint64(binlog.CommitTs))
log.Info("sync binlog success", zap.Int64("ts", binlog.CommitTs), zap.Time("datetime", dt))
checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(binlog.CommitTs))))
executeHistogram.Observe(time.Since(beginTime).Seconds())
})

if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ var (
// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, nil)
if err != nil {
return nil, errors.Trace(err)
}

return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode)
return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode, metricsGroup)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize))
func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
opts := []loader.Option{loader.WorkerCount(worker), loader.BatchSize(batchSize)}
if metricsGroup != nil {
opts = append(opts, loader.Metrics(metricsGroup))
}
loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}
Expand Down
4 changes: 2 additions & 2 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)
Expand Down Expand Up @@ -33,7 +33,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode)
syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode, nil)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
Expand Down
5 changes: 3 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package syncer
import (
"fmt"

"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

Expand All @@ -29,10 +30,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) {
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg, worker, batchSize, safemode)
return newMysqlSyncer(cfg, worker, batchSize, safemode, metricsGroup)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
2 changes: 1 addition & 1 deletion reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, 16, 20, false)
syncer, err := New(testCase.typeStr, cfg, 16, 20, false, nil)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
Expand Down

0 comments on commit 361a30c

Please sign in to comment.