Skip to content

Commit

Permalink
Support heartbeats in UTC (#471)
Browse files Browse the repository at this point in the history
Signed-off-by: Dima Kurguzov <[email protected]>
  • Loading branch information
koorgoo authored May 28, 2020
1 parent 5667073 commit 5b5ba74
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 38 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ collect.slave_hosts | 5.1 | C
collect.heartbeat | 5.1 | Collect from [heartbeat](#heartbeat).
collect.heartbeat.database | 5.1 | Database from where to collect heartbeat data. (default: heartbeat)
collect.heartbeat.table | 5.1 | Table from where to collect heartbeat data. (default: heartbeat)
collect.heartbeat.utc | 5.1 | Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`). (default: false)


### General Flags
Expand Down
16 changes: 14 additions & 2 deletions collector/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
// timestamps. %s will be replaced by the database and table name.
// The second column allows gets the server timestamp at the exact same
// time the query is run.
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `%s`.`%s`"
heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`"
)

var (
Expand All @@ -45,6 +45,10 @@ var (
"collect.heartbeat.table",
"Table from where to collect heartbeat data",
).Default("heartbeat").String()
collectHeartbeatUtc = kingpin.Flag(
"collect.heartbeat.utc",
"Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)",
).Bool()
)

// Metric descriptors.
Expand Down Expand Up @@ -85,9 +89,17 @@ func (ScrapeHeartbeat) Version() float64 {
return 5.1
}

// nowExpr returns a current timestamp expression.
func nowExpr() string {
if *collectHeartbeatUtc {
return "UTC_TIMESTAMP(6)"
}
return "NOW(6)"
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error {
query := fmt.Sprintf(heartbeatQuery, *collectHeartbeatDatabase, *collectHeartbeatTable)
query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable)
heartbeatRows, err := db.QueryContext(ctx, query)
if err != nil {
return err
Expand Down
99 changes: 63 additions & 36 deletions collector/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
Expand All @@ -25,47 +26,73 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

type ScrapeHeartbeatTestCase struct {
Args []string
Columns []string
Query string
}

var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{
{
[]string{
"--collect.heartbeat.database", "heartbeat-test",
"--collect.heartbeat.table", "heartbeat-test",
},
[]string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(NOW(6))", "server_id"},
"SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `heartbeat-test`.`heartbeat-test`",
},
{
[]string{
"--collect.heartbeat.database", "heartbeat-test",
"--collect.heartbeat.table", "heartbeat-test",
"--collect.heartbeat.utc",
},
[]string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(UTC_TIMESTAMP(6))", "server_id"},
"SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(UTC_TIMESTAMP(6)), server_id from `heartbeat-test`.`heartbeat-test`",
},
}

func TestScrapeHeartbeat(t *testing.T) {
_, err := kingpin.CommandLine.Parse([]string{
"--collect.heartbeat.database", "heartbeat-test",
"--collect.heartbeat.table", "heartbeat-test",
})
if err != nil {
t.Fatal(err)
}
for _, tt := range ScrapeHeartbeatTestCases {
t.Run(fmt.Sprint(tt.Args), func(t *testing.T) {
_, err := kingpin.CommandLine.Parse(tt.Args)
if err != nil {
t.Fatal(err)
}

db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("error opening a stub database connection: %s", err)
}
defer db.Close()

columns := []string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(NOW(6))", "server_id"}
rows := sqlmock.NewRows(columns).
AddRow("1487597613.001320", "1487598113.448042", 1)
mock.ExpectQuery(sanitizeQuery("SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `heartbeat-test`.`heartbeat-test`")).WillReturnRows(rows)
rows := sqlmock.NewRows(tt.Columns).
AddRow("1487597613.001320", "1487598113.448042", 1)
mock.ExpectQuery(sanitizeQuery(tt.Query)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()
ch := make(chan prometheus.Metric)
go func() {
if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil {
t.Errorf("error calling function on test: %s", err)
}
close(ch)
}()

counterExpected := []MetricResult{
{labels: labelMap{"server_id": "1"}, value: 1487598113.448042, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"server_id": "1"}, value: 1487597613.00132, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range counterExpected {
got := readMetric(<-ch)
convey.So(got, convey.ShouldResemble, expect)
}
})
counterExpected := []MetricResult{
{labels: labelMap{"server_id": "1"}, value: 1487598113.448042, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"server_id": "1"}, value: 1487597613.00132, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range counterExpected {
got := readMetric(<-ch)
convey.So(got, convey.ShouldResemble, expect)
}
})

// Ensure all SQL queries were executed
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
// Ensure all SQL queries were executed
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
})
}
}

0 comments on commit 5b5ba74

Please sign in to comment.