Skip to content

Commit

Permalink
Auditbeat,Metricbeat - add /inputs/ to HTTP monitoring endpoint (elas…
Browse files Browse the repository at this point in the history
…tic#36971)

Make metrics published by "inputs" available through the /inputs/ route
on the HTTP monitoring endpoint of Auditbeat and Metricbeat.

For Agent, include a snapshot of those metrics within the Agent diagnostics
bundle as "input_metrics.json".

When running under Agent, each module instance is configured with only a single
metricset. That module is given a unique `id`. That ID is what will be used
as the `id` within the /inputs/ data. And that `id` will also be added as context to the
logger that is passed into every metricset so that any log messages from a metricset
can be associated back to the agent stream ID).

Relates elastic#36945

Remove module and metricset keys from metricset metrics. 
For the `/inputs/` API, `input` is they key used to identify the type of "input" running.
The `module` and `metricset` keys become redundant with the addition of `input`.
I don't know of anything that relies on those fields.
  • Loading branch information
andrewkroh authored and Scholar-Li committed Feb 5, 2024
1 parent de75a67 commit de652c4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ is collected by it.
*Auditbeat*

- Upgrade go-libaudit to v2.4.0. {issue}36776[36776] {pull}36964[36964]
- Add a `/inputs/` route to the HTTP monitoring endpoint that exposes metrics for each dataset instance. {pull}36971[36971]

*Libbeat*

Expand All @@ -268,6 +269,7 @@ is collected by it.
- Add GCP Carbon Footprint metricbeat data {pull}34820[34820]
- Add event loop utilization metric to Kibana module {pull}35020[35020]
- Align on the algorithm used to transform Prometheus histograms into Elasticsearch histograms {pull}36647[36647]
- Add a `/inputs/` route to the HTTP monitoring endpoint that exposes metrics for each metricset instance. {pull}36971[36971]


*Osquerybeat*
Expand Down
19 changes: 19 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/module"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -155,6 +156,24 @@ func newMetricbeat(b *beat.Beat, c *conf.C, options ...Option) (*Metricbeat, err
return metricbeat, nil
}

if b.API != nil {
if err := inputmon.AttachHandler(b.API.Router()); err != nil {
return nil, fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}

if b.Manager != nil {
b.Manager.RegisterDiagnosticHook("input_metrics", "Metrics from active inputs.",
"input_metrics.json", "application/json", func() []byte {
data, err := inputmon.MetricSnapshotJSON()
if err != nil {
logp.L().Warnw("Failed to collect input metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
})
}

moduleOptions := append(
[]module.Option{module.WithMaxStartDelay(config.MaxStartDelay)},
metricbeat.moduleOptions...)
Expand Down
21 changes: 17 additions & 4 deletions metricbeat/mb/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,20 +182,33 @@ func newBaseMetricSets(r *Register, m Module) ([]BaseMetricSet, error) {
}
msID := id.String()
metrics := monitoring.NewRegistry()
monitoring.NewString(metrics, "module").Set(m.Name())
monitoring.NewString(metrics, "metricset").Set(name)
monitoring.NewString(metrics, "input").Set(m.Name() + "/" + name)
if host != "" {
monitoring.NewString(metrics, "host").Set(host)
}
monitoring.NewString(metrics, "id").Set(msID)
monitoring.NewString(metrics, "ephemeral_id").Set(msID)
if configuredID := m.Config().ID; configuredID != "" {
// If a module ID was configured, then use that as the ID within metrics.
// Note that the "ephemeral_id" is what is used as the monitoring registry
// key. This module ID is not unique to the MetricSet instance when multiple
// hosts are monitored or if multiple different MetricSet types were enabled
// under the same module instance.
monitoring.NewString(metrics, "id").Set(configuredID)
} else {
monitoring.NewString(metrics, "id").Set(msID)
}

logger := logp.NewLogger(m.Name() + "." + name)
if m.Config().ID != "" {
logger = logger.With("id", m.Config().ID)
}
metricsets = append(metricsets, BaseMetricSet{
id: msID,
name: name,
module: m,
host: host,
metrics: metrics,
logger: logp.NewLogger(m.Name() + "." + name),
logger: logger,
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func (b *BaseMetricSet) Registration() MetricSetRegistration {
// the metricset fetches not only the predefined fields but add alls raw data under
// the raw namespace to the event.
type ModuleConfig struct {
ID string `config:"id"` // Optional ID (not guaranteed to be unique).
Hosts []string `config:"hosts"`
Period time.Duration `config:"period" validate:"positive"`
Timeout time.Duration `config:"timeout" validate:"positive"`
Expand All @@ -375,8 +376,8 @@ type ModuleConfig struct {

func (c ModuleConfig) String() string {
return fmt.Sprintf(`{Module:"%v", MetricSets:%v, Enabled:%v, `+
`Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v, Query:%v}`,
c.Module, c.MetricSets, c.Enabled, len(c.Hosts), c.Period, c.Timeout,
`ID:"%s", Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v, Query:%v}`,
c.Module, c.MetricSets, c.Enabled, c.ID, len(c.Hosts), c.Period, c.Timeout,
c.Raw, c.Query)
}

Expand Down

0 comments on commit de652c4

Please sign in to comment.