Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reparo: support pprof and metrics #1184

Merged
merged 5 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions reparo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
StopTSO int64 `toml:"stop-tso" json:"stop-tso"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
StatusAddr string `toml:"status-addr" json:"status-addr"`

DestType string `toml:"dest-type" json:"dest-type"`
DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"`
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewConfig() *Config {
fs.StringVar(&c.Dir, "data-dir", "", "drainer data directory path")
fs.StringVar(&c.StartDatetime, "start-datetime", "", "recovery from start-datetime, empty string means starting from the beginning of the first file")
fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.")
fs.StringVar(&c.StatusAddr, "status-addr", ":8381", "reparo API server and pprof addr")
fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format")
fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format")
fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
Expand Down
1 change: 1 addition & 0 deletions reparo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Decode(r io.Reader) (*pb.Binlog, int64, error) {
if err != nil {
return nil, 0, errors.Trace(err)
}
readBinlogSizeHistogram.Observe(float64(length))

binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
Expand Down
69 changes: 69 additions & 0 deletions reparo/http_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package reparo

import (
"net"
"net/http"
"net/http/pprof"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)

var cmuxReadTimeout = 10 * time.Second

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)

httpServer := &http.Server{
Handler: router,
}
err := httpServer.Serve(lis)
err = errors.Cause(err)
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
log.Info("reparo http handler return with error", zap.String("error", err.Error()))
}
}

func startReparoService(addr string) error {
rootLis, err := net.Listen("tcp", addr)
if err != nil {
return errors.Annotate(err, "start listening")
}

// create a cmux
m := cmux.New(rootLis)
m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352

httpL := m.Match(cmux.HTTP1Fast())
go startHTTPServer(httpL)

err = m.Serve() // start serving, block
if err != nil && isErrNetClosing(err) {
err = nil
}
return err
}

var useOfClosedErrMsg = "use of closed network connection"

// isErrNetClosing checks whether is an ErrNetClosing error
func isErrNetClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), useOfClosedErrMsg)
}
71 changes: 71 additions & 0 deletions reparo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package reparo

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
eventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "event",
Help: "the count of sql event(dml, ddl).",
}, []string{"type"})

checkpointTSOGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "checkpoint_tso",
Help: "save checkpoint tso of drainer.",
})
executeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "execute_duration_time",
Help: "Bucketed histogram of processing time (s) of a execute to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
})

queryHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "query_duration_time",
Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"type"})

readBinlogSizeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "read_binlog_size",
Help: "Bucketed histogram of size of a binlog.",
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
})

queueSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "queue_size",
Help: "the size of queue",
}, []string{"name"})
)

func init() {
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(checkpointTSOGauge)
registry.MustRegister(eventCounter)
registry.MustRegister(executeHistogram)
registry.MustRegister(readBinlogSizeHistogram)
registry.MustRegister(queryHistogramVec)
registry.MustRegister(queueSizeGauge)

prometheus.DefaultGatherer = registry
}
20 changes: 18 additions & 2 deletions reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package reparo

import (
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb-binlog/reparo/syncer"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -37,7 +39,11 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Info("New Reparo", zap.Stringer("config", cfg))

syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode, &loader.MetricsGroup{
EventCounterVec: eventCounter,
QueryHistogramVec: queryHistogramVec,
QueueSizeGauge: queueSizeGauge,
})
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -53,6 +59,14 @@ func New(cfg *Config) (*Reparo, error) {

// Process runs the main procedure.
func (r *Reparo) Process() error {
if r.cfg.StatusAddr != "" {
go func() {
err := startReparoService(r.cfg.StatusAddr)
if err != nil {
log.Info("meet error when stopping reparo http service", zap.String("error", err.Error()))
}
}()
}
pbReader, err := newDirPbReader(r.cfg.Dir, r.cfg.StartTSO, r.cfg.StopTSO)
if err != nil {
return errors.Annotatef(err, "new reader failed dir: %s", r.cfg.Dir)
Expand All @@ -77,10 +91,12 @@ func (r *Reparo) Process() error {
if ignore {
continue
}

beginTime := time.Now()
err = r.syncer.Sync(binlog, func(binlog *pb.Binlog) {
dt := oracle.GetTimeFromTS(uint64(binlog.CommitTs))
log.Info("sync binlog success", zap.Int64("ts", binlog.CommitTs), zap.Time("datetime", dt))
checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(binlog.CommitTs))))
executeHistogram.Observe(time.Since(beginTime).Seconds())
})

if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ var (
// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, nil)
if err != nil {
return nil, errors.Trace(err)
}

return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode)
return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode, metricsGroup)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize))
func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
opts := []loader.Option{loader.WorkerCount(worker), loader.BatchSize(batchSize)}
if metricsGroup != nil {
opts = append(opts, loader.Metrics(metricsGroup))
}
loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}
Expand Down
4 changes: 2 additions & 2 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)
Expand Down Expand Up @@ -33,7 +33,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode)
syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode, nil)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
Expand Down
5 changes: 3 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package syncer
import (
"fmt"

"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

Expand All @@ -29,10 +30,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) {
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg, worker, batchSize, safemode)
return newMysqlSyncer(cfg, worker, batchSize, safemode, metricsGroup)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
2 changes: 1 addition & 1 deletion reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, 16, 20, false)
syncer, err := New(testCase.typeStr, cfg, 16, 20, false, nil)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
Expand Down