From abece6729fafc5a8fb3b5c04b1cc3198345c2171 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 4 Feb 2020 15:09:53 +0100 Subject: [PATCH] Reuse connections on sql module (#16001) (#16027) Keep a connection open with the database instead of opening and closing connections on each fetch. this is consistent with other specific SQL modules we have. (cherry picked from commit 583a89ccc71997208c5d4fc403a7ba7d45a46f71) --- CHANGELOG.next.asciidoc | 1 + x-pack/metricbeat/module/sql/query/query.go | 40 ++++++++++++++++----- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 96bb75681d95..2381007708bd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -204,6 +204,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Make the `system/cpu` metricset collect normalized CPU metrics by default. {issue}15618[15618] {pull}15729[15729] - Add citadel metricset for Istio Metricbeat module {pull}15990[15990] - Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] +- Reuse connections in SQL module. {pull}16001[16001] *Packetbeat* diff --git a/x-pack/metricbeat/module/sql/query/query.go b/x-pack/metricbeat/module/sql/query/query.go index 7a0c66de9da2..5ff543aa681d 100644 --- a/x-pack/metricbeat/module/sql/query/query.go +++ b/x-pack/metricbeat/module/sql/query/query.go @@ -5,6 +5,7 @@ package query import ( + "context" "fmt" "strconv" "strings" @@ -43,6 +44,8 @@ type MetricSet struct { Driver string Query string ResponseFormat string + + db *sqlx.DB } // New creates a new instance of the MetricSet. New is responsible for unpacking @@ -77,19 +80,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). // It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response // format of the query. -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - db, err := sqlx.Open(m.Driver, m.HostData().URI) +func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error { + db, err := m.DB() if err != nil { return errors.Wrap(err, "error opening connection") } - defer db.Close() - - err = db.Ping() - if err != nil { - return errors.Wrap(err, "error testing connection") - } - rows, err := db.Queryx(m.Query) + rows, err := db.QueryxContext(ctx, m.Query) if err != nil { return errors.Wrap(err, "error executing query") } @@ -102,6 +99,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { return m.fetchVariableMode(rows, report) } +// DB gets a client ready to query the database +func (m *MetricSet) DB() (*sqlx.DB, error) { + if m.db == nil { + db, err := sqlx.Open(m.Driver, m.HostData().URI) + if err != nil { + return nil, errors.Wrap(err, "opening connection") + } + err = db.Ping() + if err != nil { + return nil, errors.Wrap(err, "testing connection") + } + + m.db = db + } + return m.db, nil +} + // fetchTableMode scan the rows and publishes the event for querys that return the response in a table format. func (m *MetricSet) fetchTableMode(rows *sqlx.Rows, report mb.ReporterV2) error { @@ -229,3 +243,11 @@ func getValue(pval *interface{}) string { return fmt.Sprint(v) } } + +// Close closes the connection pool releasing its resources +func (m *MetricSet) Close() error { + if m.db == nil { + return nil + } + return errors.Wrap(m.db.Close(), "closing connection") +}