diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e01d46724101..f944edc7d4ba 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Azure Billing: upgrade Usage Details API to version 2019-10-01 {pull}31970[31970] * Differentiate between actual idle CPU states and an uninterruptible disk sleep. https://github.com/elastic/elastic-agent-system-metrics/pull/32[system-metrics#32] - AWS Fargate: Added support for DesiredStatus and KnownStatus {issue}32077[32077] {pull}32342[#32342] +- Enable Generic SQL merge metrics to a single event for sql_queries using new flag {pull}32394[32394] *Packetbeat* diff --git a/metricbeat/docs/modules/sql.asciidoc b/metricbeat/docs/modules/sql.asciidoc index 0c8b1b6bc892..987bc5556e78 100644 --- a/metricbeat/docs/modules/sql.asciidoc +++ b/metricbeat/docs/modules/sql.asciidoc @@ -484,6 +484,72 @@ The example generates this event: By disabling the flag `raw_data.enabled`, whic } ---- +[float] +=== Example: Merge multiple queries to single event. + +Multiple queries will create multiple events, one for each query. It may be preferrable to create a single event by combining the metrics together in a single event. + +This feature can be enabled using the `merge_results` config. + +However, such a merge is possible only if the table queries being merged, each produce a single row. + +For example: + +[source,yaml] +---- +- module: sql + metricsets: + - query + period: 10s + hosts: ["postgres://postgres:postgres@localhost?sslmode=disable"] + + driver: "postgres" + raw_data.enabled: true + merge_results: true + sql_queries: + - query: "SELECT blks_hit,blks_read FROM pg_stat_database limit 1;" + response_format: table + - query: "select checkpoints_timed,checkpoints_req from pg_stat_bgwriter;" + response_format: table +---- + +This creates a combined event as below, where `blks_hit`, `blks_read`, `checkpoints_timed` and `checkpoints_req` are part of same event. + +[source,json] +---- +{ + "@timestamp": "2022-07-21T07:07:06.747Z", + "agent": { + "name": "Lalits-MBP-2", + "type": "metricbeat", + "version": "8.4.0", + "ephemeral_id": "b0867287-e56a-492f-b421-0ac870c426f9", + "id": "3fe7b378-6f9e-4ca3-9aa1-067c4a6866e5" + }, + "metricset": { + "period": 10000, + "name": "query" + }, + "service": { + "type": "sql", + "address": "localhost" + }, + "sql": { + "metrics": { + "blks_read": 21, + "checkpoints_req": 1, + "checkpoints_timed": 66, + "blks_hit": 7592 + }, + "driver": "postgres" + }, + "event": { + "module": "sql", + "duration": 18883084, + "dataset": "sql.query" + } +} +---- :edit_url: diff --git a/x-pack/metricbeat/module/sql/_meta/docs.asciidoc b/x-pack/metricbeat/module/sql/_meta/docs.asciidoc index 3b795ae4ff73..08deea8196ef 100644 --- a/x-pack/metricbeat/module/sql/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/sql/_meta/docs.asciidoc @@ -469,3 +469,70 @@ The example generates this event: By disabling the flag `raw_data.enabled`, whic } } ---- + +[float] +=== Example: Merge multiple queries to single event. + +Multiple queries will create multiple events, one for each query. It may be preferrable to create a single event by combining the metrics together in a single event. + +This feature can be enabled using the `merge_results` config. + +However, such a merge is possible only if the table queries being merged, each produce a single row. + +For example: + +[source,yaml] +---- +- module: sql + metricsets: + - query + period: 10s + hosts: ["postgres://postgres:postgres@localhost?sslmode=disable"] + + driver: "postgres" + raw_data.enabled: true + merge_results: true + sql_queries: + - query: "SELECT blks_hit,blks_read FROM pg_stat_database limit 1;" + response_format: table + - query: "select checkpoints_timed,checkpoints_req from pg_stat_bgwriter;" + response_format: table +---- + +This creates a combined event as below, where `blks_hit`, `blks_read`, `checkpoints_timed` and `checkpoints_req` are part of same event. + +[source,json] +---- +{ + "@timestamp": "2022-07-21T07:07:06.747Z", + "agent": { + "name": "Lalits-MBP-2", + "type": "metricbeat", + "version": "8.4.0", + "ephemeral_id": "b0867287-e56a-492f-b421-0ac870c426f9", + "id": "3fe7b378-6f9e-4ca3-9aa1-067c4a6866e5" + }, + "metricset": { + "period": 10000, + "name": "query" + }, + "service": { + "type": "sql", + "address": "localhost" + }, + "sql": { + "metrics": { + "blks_read": 21, + "checkpoints_req": 1, + "checkpoints_timed": 66, + "blks_hit": 7592 + }, + "driver": "postgres" + }, + "event": { + "module": "sql", + "duration": 18883084, + "dataset": "sql.query" + } +} +---- \ No newline at end of file diff --git a/x-pack/metricbeat/module/sql/query/query.go b/x-pack/metricbeat/module/sql/query/query.go index 43a8ac02938d..7f47f565fc2b 100644 --- a/x-pack/metricbeat/module/sql/query/query.go +++ b/x-pack/metricbeat/module/sql/query/query.go @@ -49,7 +49,8 @@ type config struct { ResponseFormat string `config:"sql_response_format"` Query string `config:"sql_query" ` - Queries []query `config:"sql_queries" ` + Queries []query `config:"sql_queries" ` + MergeResults bool `config:"merge_results"` } // MetricSet holds any configuration or state information. It must implement @@ -124,6 +125,8 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { queries = append(queries, one_query) } + merged := mapstr.M{} + for _, q := range queries { if q.ResponseFormat == tableResponseFormat { // Table format @@ -133,7 +136,22 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { } for _, ms := range mss { - m.reportEvent(ms, reporter, q.Query) + if m.Config.MergeResults { + if len(mss) > 1 { + return fmt.Errorf("can not merge query resulting with more than one rows: %s", q) + } else { + for k, v := range ms { + _, ok := merged[k] + if ok { + m.Logger().Warn("overwriting duplicate metrics: ", k) + } + merged[k] = v + } + } + } else { + // Report immediately for non-merged cases. + m.reportEvent(ms, reporter, q.Query) + } } } else { // Variable format @@ -142,9 +160,24 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { return fmt.Errorf("fetch variable mode failed: %w", err) } - m.reportEvent(ms, reporter, q.Query) + if m.Config.MergeResults { + for k, v := range ms { + _, ok := merged[k] + if ok { + m.Logger().Warn("overwriting duplicate metrics: ", k) + } + merged[k] = v + } + } else { + // Report immediately for non-merged cases. + m.reportEvent(ms, reporter, q.Query) + } } } + if m.Config.MergeResults { + // Report here for merged case. + m.reportEvent(merged, reporter, "") + } return nil } @@ -154,20 +187,32 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error { func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) { if m.Config.RawData.Enabled { - reporter.Event(mb.Event{ - // New usage. - // Only driver & query field mapped. - // metrics to be mapped by end user. - ModuleFields: mapstr.M{ - "metrics": ms, // Individual metric - "driver": m.Config.Driver, - "query": qry, - }, - }) + // New usage. + // Only driver & query field mapped. + // metrics to be mapped by end user. + if len(qry) > 0 { + // set query. + reporter.Event(mb.Event{ + ModuleFields: mapstr.M{ + "metrics": ms, // Individual metric + "driver": m.Config.Driver, + "query": qry, + }, + }) + } else { + reporter.Event(mb.Event{ + // Do not set query. + ModuleFields: mapstr.M{ + "metrics": ms, // Individual metric + "driver": m.Config.Driver, + }, + }) + + } } else { + // Previous usage. Backword compartibility. + // Supports field mapping. reporter.Event(mb.Event{ - // Previous usage. Backword compartibility. - // Supports field mapping. ModuleFields: mapstr.M{ "driver": m.Config.Driver, "query": qry, diff --git a/x-pack/metricbeat/module/sql/query/query_integration_test.go b/x-pack/metricbeat/module/sql/query/query_integration_test.go index acc9bb48f68a..fb7692c17c0e 100644 --- a/x-pack/metricbeat/module/sql/query/query_integration_test.go +++ b/x-pack/metricbeat/module/sql/query/query_integration_test.go @@ -158,10 +158,14 @@ func TestPostgreSQL(t *testing.T) { Assertion: func(t *testing.T, event beat.Event) { value, err := event.GetValue("sql.query") assert.NoError(t, err) - require.NotEmpty(t, value.(map[string]interface{})) + require.NotEmpty(t, value) }, } + t.Run("fetch with URL", func(t *testing.T) { + testFetch(t, cfg) + }) + }) t.Run("table mode", func(t *testing.T) { @@ -178,10 +182,46 @@ func TestPostgreSQL(t *testing.T) { Assertion: func(t *testing.T, event beat.Event) { value, err := event.GetValue("sql.query") assert.NoError(t, err) - require.NotEmpty(t, value.(map[string]interface{})) + require.NotEmpty(t, value) + }, + } + + t.Run("fetch with URL", func(t *testing.T) { + testFetch(t, cfg) + }) + + }) + + t.Run("merged mode", func(t *testing.T) { + cfg = testFetchConfig{ + config: config{ + Driver: "postgres", + Queries: []query{ + query{Query: "SELECT blks_hit FROM pg_stat_database limit 1;", ResponseFormat: "table"}, + query{Query: "SELECT blks_read FROM pg_stat_database limit 1;", ResponseFormat: "table"}, + }, + ResponseFormat: tableResponseFormat, + RawData: rawData{ + Enabled: true, + }, + MergeResults: true, + }, + Host: fmt.Sprintf("postgres://%s:%s@%s:%s/?sslmode=disable", user, password, host, port), + Assertion: func(t *testing.T, event beat.Event) { + // Ensure both merged fields are there in a single event. + value1, err1 := event.GetValue("sql.metrics.blks_hit") + assert.NoError(t, err1) + require.NotEmpty(t, value1) + value2, err2 := event.GetValue("sql.metrics.blks_read") + assert.NoError(t, err2) + require.NotEmpty(t, value2) }, } + t.Run("fetch with URL", func(t *testing.T) { + testFetch(t, cfg) + }) + }) }) } @@ -212,7 +252,9 @@ func getConfig(cfg testFetchConfig) map[string]interface{} { "hosts": []string{cfg.Host}, "driver": cfg.config.Driver, "sql_query": cfg.config.Query, + "sql_queries": cfg.config.Queries, "raw_data.enabled": cfg.config.RawData.Enabled, + "merge_results": cfg.config.MergeResults, } if cfg.config.ResponseFormat != "" {