From 5b5ba746cf6679a55a2de51fb4c0eb369d9aabed Mon Sep 17 00:00:00 2001 From: Dima Kurguzov Date: Thu, 28 May 2020 09:29:37 +0200 Subject: [PATCH] Support heartbeats in UTC (#471) Signed-off-by: Dima Kurguzov --- README.md | 1 + collector/heartbeat.go | 16 +++++- collector/heartbeat_test.go | 99 +++++++++++++++++++++++-------------- 3 files changed, 78 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 65a24465..f4fbc8ed 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,7 @@ collect.slave_hosts | 5.1 | C collect.heartbeat | 5.1 | Collect from [heartbeat](#heartbeat). collect.heartbeat.database | 5.1 | Database from where to collect heartbeat data. (default: heartbeat) collect.heartbeat.table | 5.1 | Table from where to collect heartbeat data. (default: heartbeat) +collect.heartbeat.utc | 5.1 | Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`). (default: false) ### General Flags diff --git a/collector/heartbeat.go b/collector/heartbeat.go index 1c6cb582..35ba1c30 100644 --- a/collector/heartbeat.go +++ b/collector/heartbeat.go @@ -33,7 +33,7 @@ const ( // timestamps. %s will be replaced by the database and table name. // The second column allows gets the server timestamp at the exact same // time the query is run. - heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `%s`.`%s`" + heartbeatQuery = "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(%s), server_id from `%s`.`%s`" ) var ( @@ -45,6 +45,10 @@ var ( "collect.heartbeat.table", "Table from where to collect heartbeat data", ).Default("heartbeat").String() + collectHeartbeatUtc = kingpin.Flag( + "collect.heartbeat.utc", + "Use UTC for timestamps of the current server (`pt-heartbeat` is called with `--utc`)", + ).Bool() ) // Metric descriptors. @@ -85,9 +89,17 @@ func (ScrapeHeartbeat) Version() float64 { return 5.1 } +// nowExpr returns a current timestamp expression. +func nowExpr() string { + if *collectHeartbeatUtc { + return "UTC_TIMESTAMP(6)" + } + return "NOW(6)" +} + // Scrape collects data from database connection and sends it over channel as prometheus metric. func (ScrapeHeartbeat) Scrape(ctx context.Context, db *sql.DB, ch chan<- prometheus.Metric, logger log.Logger) error { - query := fmt.Sprintf(heartbeatQuery, *collectHeartbeatDatabase, *collectHeartbeatTable) + query := fmt.Sprintf(heartbeatQuery, nowExpr(), *collectHeartbeatDatabase, *collectHeartbeatTable) heartbeatRows, err := db.QueryContext(ctx, query) if err != nil { return err diff --git a/collector/heartbeat_test.go b/collector/heartbeat_test.go index db4b1463..89576110 100644 --- a/collector/heartbeat_test.go +++ b/collector/heartbeat_test.go @@ -15,6 +15,7 @@ package collector import ( "context" + "fmt" "testing" "github.com/DATA-DOG/go-sqlmock" @@ -25,47 +26,73 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +type ScrapeHeartbeatTestCase struct { + Args []string + Columns []string + Query string +} + +var ScrapeHeartbeatTestCases = []ScrapeHeartbeatTestCase{ + { + []string{ + "--collect.heartbeat.database", "heartbeat-test", + "--collect.heartbeat.table", "heartbeat-test", + }, + []string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(NOW(6))", "server_id"}, + "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `heartbeat-test`.`heartbeat-test`", + }, + { + []string{ + "--collect.heartbeat.database", "heartbeat-test", + "--collect.heartbeat.table", "heartbeat-test", + "--collect.heartbeat.utc", + }, + []string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(UTC_TIMESTAMP(6))", "server_id"}, + "SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(UTC_TIMESTAMP(6)), server_id from `heartbeat-test`.`heartbeat-test`", + }, +} + func TestScrapeHeartbeat(t *testing.T) { - _, err := kingpin.CommandLine.Parse([]string{ - "--collect.heartbeat.database", "heartbeat-test", - "--collect.heartbeat.table", "heartbeat-test", - }) - if err != nil { - t.Fatal(err) - } + for _, tt := range ScrapeHeartbeatTestCases { + t.Run(fmt.Sprint(tt.Args), func(t *testing.T) { + _, err := kingpin.CommandLine.Parse(tt.Args) + if err != nil { + t.Fatal(err) + } - db, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("error opening a stub database connection: %s", err) - } - defer db.Close() + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("error opening a stub database connection: %s", err) + } + defer db.Close() - columns := []string{"UNIX_TIMESTAMP(ts)", "UNIX_TIMESTAMP(NOW(6))", "server_id"} - rows := sqlmock.NewRows(columns). - AddRow("1487597613.001320", "1487598113.448042", 1) - mock.ExpectQuery(sanitizeQuery("SELECT UNIX_TIMESTAMP(ts), UNIX_TIMESTAMP(NOW(6)), server_id from `heartbeat-test`.`heartbeat-test`")).WillReturnRows(rows) + rows := sqlmock.NewRows(tt.Columns). + AddRow("1487597613.001320", "1487598113.448042", 1) + mock.ExpectQuery(sanitizeQuery(tt.Query)).WillReturnRows(rows) - ch := make(chan prometheus.Metric) - go func() { - if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil { - t.Errorf("error calling function on test: %s", err) - } - close(ch) - }() + ch := make(chan prometheus.Metric) + go func() { + if err = (ScrapeHeartbeat{}).Scrape(context.Background(), db, ch, log.NewNopLogger()); err != nil { + t.Errorf("error calling function on test: %s", err) + } + close(ch) + }() - counterExpected := []MetricResult{ - {labels: labelMap{"server_id": "1"}, value: 1487598113.448042, metricType: dto.MetricType_GAUGE}, - {labels: labelMap{"server_id": "1"}, value: 1487597613.00132, metricType: dto.MetricType_GAUGE}, - } - convey.Convey("Metrics comparison", t, func() { - for _, expect := range counterExpected { - got := readMetric(<-ch) - convey.So(got, convey.ShouldResemble, expect) - } - }) + counterExpected := []MetricResult{ + {labels: labelMap{"server_id": "1"}, value: 1487598113.448042, metricType: dto.MetricType_GAUGE}, + {labels: labelMap{"server_id": "1"}, value: 1487597613.00132, metricType: dto.MetricType_GAUGE}, + } + convey.Convey("Metrics comparison", t, func() { + for _, expect := range counterExpected { + got := readMetric(<-ch) + convey.So(got, convey.ShouldResemble, expect) + } + }) - // Ensure all SQL queries were executed - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("there were unfulfilled exceptions: %s", err) + // Ensure all SQL queries were executed + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } + }) } }