Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic SQL join results of multiple queries #32394

Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Azure Billing: upgrade Usage Details API to version 2019-10-01 {pull}31970[31970]
* Differentiate between actual idle CPU states and an uninterruptible disk sleep. https://github.com/elastic/elastic-agent-system-metrics/pull/32[system-metrics#32]
- AWS Fargate: Added support for DesiredStatus and KnownStatus {issue}32077[32077] {pull}32342[#32342]
- Enable Generic SQL merge metrics to a single event for sql_queries using new flag {pull}32394[32394]

*Packetbeat*

Expand Down
66 changes: 66 additions & 0 deletions metricbeat/docs/modules/sql.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,72 @@ The example generates this event: By disabling the flag `raw_data.enabled`, whic
}
----

[float]
=== Example: Merge multiple queries to single event.

Multiple queries will create multiple events, one for each query. It may be preferrable to create a single event by combining the metrics together in a single event.

This feature can be enabled using the `merge_results` config.

However, such a merge is possible only if the table queries being merged, each produce a single row.

For example:

[source,yaml]
----
- module: sql
metricsets:
- query
period: 10s
hosts: ["postgres://postgres:postgres@localhost?sslmode=disable"]

driver: "postgres"
raw_data.enabled: true
merge_results: true
sql_queries:
- query: "SELECT blks_hit,blks_read FROM pg_stat_database limit 1;"
response_format: table
- query: "select checkpoints_timed,checkpoints_req from pg_stat_bgwriter;"
response_format: table
----

This creates a combined event as below, where `blks_hit`, `blks_read`, `checkpoints_timed` and `checkpoints_req` are part of same event.

[source,json]
----
{
"@timestamp": "2022-07-21T07:07:06.747Z",
"agent": {
"name": "Lalits-MBP-2",
"type": "metricbeat",
"version": "8.4.0",
"ephemeral_id": "b0867287-e56a-492f-b421-0ac870c426f9",
"id": "3fe7b378-6f9e-4ca3-9aa1-067c4a6866e5"
},
"metricset": {
"period": 10000,
"name": "query"
},
"service": {
"type": "sql",
"address": "localhost"
},
"sql": {
"metrics": {
"blks_read": 21,
"checkpoints_req": 1,
"checkpoints_timed": 66,
"blks_hit": 7592
},
"driver": "postgres"
},
"event": {
"module": "sql",
"duration": 18883084,
"dataset": "sql.query"
}
}
----

:edit_url:

Expand Down
67 changes: 67 additions & 0 deletions x-pack/metricbeat/module/sql/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,70 @@ The example generates this event: By disabling the flag `raw_data.enabled`, whic
}
}
----

[float]
=== Example: Merge multiple queries to single event.

Multiple queries will create multiple events, one for each query. It may be preferrable to create a single event by combining the metrics together in a single event.

This feature can be enabled using the `merge_results` config.

However, such a merge is possible only if the table queries being merged, each produce a single row.

For example:

[source,yaml]
----
- module: sql
metricsets:
- query
period: 10s
hosts: ["postgres://postgres:postgres@localhost?sslmode=disable"]

driver: "postgres"
raw_data.enabled: true
merge_results: true
sql_queries:
- query: "SELECT blks_hit,blks_read FROM pg_stat_database limit 1;"
response_format: table
- query: "select checkpoints_timed,checkpoints_req from pg_stat_bgwriter;"
response_format: table
----

This creates a combined event as below, where `blks_hit`, `blks_read`, `checkpoints_timed` and `checkpoints_req` are part of same event.

[source,json]
----
{
"@timestamp": "2022-07-21T07:07:06.747Z",
"agent": {
"name": "Lalits-MBP-2",
"type": "metricbeat",
"version": "8.4.0",
"ephemeral_id": "b0867287-e56a-492f-b421-0ac870c426f9",
"id": "3fe7b378-6f9e-4ca3-9aa1-067c4a6866e5"
},
"metricset": {
"period": 10000,
"name": "query"
},
"service": {
"type": "sql",
"address": "localhost"
},
"sql": {
"metrics": {
"blks_read": 21,
"checkpoints_req": 1,
"checkpoints_timed": 66,
"blks_hit": 7592
},
"driver": "postgres"
},
"event": {
"module": "sql",
"duration": 18883084,
"dataset": "sql.query"
}
}
----
75 changes: 60 additions & 15 deletions x-pack/metricbeat/module/sql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type config struct {
ResponseFormat string `config:"sql_response_format"`
Query string `config:"sql_query" `

Queries []query `config:"sql_queries" `
Queries []query `config:"sql_queries" `
MergeResults bool `config:"merge_results"`
}

// MetricSet holds any configuration or state information. It must implement
Expand Down Expand Up @@ -124,6 +125,8 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
queries = append(queries, one_query)
}

merged := mapstr.M{}

for _, q := range queries {
if q.ResponseFormat == tableResponseFormat {
// Table format
Expand All @@ -133,7 +136,22 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
}

for _, ms := range mss {
m.reportEvent(ms, reporter, q.Query)
if m.Config.MergeResults {
if len(mss) > 1 {
return fmt.Errorf("can not merge query resulting with more than one rows: %s", q)
} else {
for k, v := range ms {
_, ok := merged[k]
if ok {
m.Logger().Warn("overwriting duplicate metrics: ", k)
}
merged[k] = v
}
}
} else {
// Report immediately for non-merged cases.
m.reportEvent(ms, reporter, q.Query)
}
}
} else {
// Variable format
Expand All @@ -142,9 +160,24 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
return fmt.Errorf("fetch variable mode failed: %w", err)
}

m.reportEvent(ms, reporter, q.Query)
if m.Config.MergeResults {
for k, v := range ms {
_, ok := merged[k]
if ok {
m.Logger().Warn("overwriting duplicate metrics: ", k)
}
merged[k] = v
}
} else {
// Report immediately for non-merged cases.
m.reportEvent(ms, reporter, q.Query)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. This is getting more complicated now, maybe the code for variable and table format could be moved to separated functions.

}
}
if m.Config.MergeResults {
// Report here for merged case.
m.reportEvent(merged, reporter, "")
}

return nil
}
Expand All @@ -154,20 +187,32 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) error {
func (m *MetricSet) reportEvent(ms mapstr.M, reporter mb.ReporterV2, qry string) {
if m.Config.RawData.Enabled {

Copy link
Member

@jsoriano jsoriano Jul 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to do this would be:

moduleFields := mapstr.M{
	"metrics": ms, // Individual metric
	"driver":  m.Config.Driver,
}
if qry != "" {
	moduleFields["query"] = qry
}
reporter.Event(mb.Event{
	ModuleFields: moduleFields,
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I hope we can stay with the current structure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, you can do it this way in case you feel it complicates less the code 🙂 #32394 (comment)

reporter.Event(mb.Event{
// New usage.
// Only driver & query field mapped.
// metrics to be mapped by end user.
ModuleFields: mapstr.M{
"metrics": ms, // Individual metric
"driver": m.Config.Driver,
"query": qry,
},
})
// New usage.
// Only driver & query field mapped.
// metrics to be mapped by end user.
if len(qry) > 0 {
// set query.
reporter.Event(mb.Event{
ModuleFields: mapstr.M{
"metrics": ms, // Individual metric
"driver": m.Config.Driver,
"query": qry,
},
})
} else {
reporter.Event(mb.Event{
// Do not set query.
ModuleFields: mapstr.M{
"metrics": ms, // Individual metric
"driver": m.Config.Driver,
},
})

}
} else {
// Previous usage. Backword compartibility.
// Supports field mapping.
reporter.Event(mb.Event{
// Previous usage. Backword compartibility.
// Supports field mapping.
ModuleFields: mapstr.M{
"driver": m.Config.Driver,
"query": qry,
Expand Down
46 changes: 44 additions & 2 deletions x-pack/metricbeat/module/sql/query/query_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,14 @@ func TestPostgreSQL(t *testing.T) {
Assertion: func(t *testing.T, event beat.Event) {
value, err := event.GetValue("sql.query")
assert.NoError(t, err)
require.NotEmpty(t, value.(map[string]interface{}))
require.NotEmpty(t, value)
},
}

t.Run("fetch with URL", func(t *testing.T) {
testFetch(t, cfg)
})

})

t.Run("table mode", func(t *testing.T) {
Expand All @@ -178,10 +182,46 @@ func TestPostgreSQL(t *testing.T) {
Assertion: func(t *testing.T, event beat.Event) {
value, err := event.GetValue("sql.query")
assert.NoError(t, err)
require.NotEmpty(t, value.(map[string]interface{}))
require.NotEmpty(t, value)
},
}

t.Run("fetch with URL", func(t *testing.T) {
testFetch(t, cfg)
})

})

t.Run("merged mode", func(t *testing.T) {
cfg = testFetchConfig{
config: config{
Driver: "postgres",
Queries: []query{
query{Query: "SELECT blks_hit FROM pg_stat_database limit 1;", ResponseFormat: "table"},
query{Query: "SELECT blks_read FROM pg_stat_database limit 1;", ResponseFormat: "table"},
},
ResponseFormat: tableResponseFormat,
RawData: rawData{
Enabled: true,
},
MergeResults: true,
},
Host: fmt.Sprintf("postgres://%s:%s@%s:%s/?sslmode=disable", user, password, host, port),
Assertion: func(t *testing.T, event beat.Event) {
// Ensure both merged fields are there in a single event.
value1, err1 := event.GetValue("sql.metrics.blks_hit")
assert.NoError(t, err1)
require.NotEmpty(t, value1)
value2, err2 := event.GetValue("sql.metrics.blks_read")
assert.NoError(t, err2)
require.NotEmpty(t, value2)
},
}

t.Run("fetch with URL", func(t *testing.T) {
testFetch(t, cfg)
})

})
})
}
Expand Down Expand Up @@ -212,7 +252,9 @@ func getConfig(cfg testFetchConfig) map[string]interface{} {
"hosts": []string{cfg.Host},
"driver": cfg.config.Driver,
"sql_query": cfg.config.Query,
"sql_queries": cfg.config.Queries,
"raw_data.enabled": cfg.config.RawData.Enabled,
"merge_results": cfg.config.MergeResults,
}

if cfg.config.ResponseFormat != "" {
Expand Down