From 5c159ece0801440317c4261a6f531236e190d4af Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 4 Dec 2024 13:57:24 -0600 Subject: [PATCH] Add unique per beat monitoring namespace --- filebeat/beater/filebeat.go | 14 +++++++-- heartbeat/monitors/mocks.go | 9 ++---- libbeat/beat/info.go | 7 +++-- libbeat/cmd/instance/beat.go | 36 ++++++++++++++++-------- x-pack/filebeat/input/benchmark/input.go | 10 +++---- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index ceab21aa3590..4909941b90ae 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error { waitEvents := newSignalWait() // count active events for waiting on shutdown + var reg *monitoring.Registry + + if b.Info.Monitoring.Namespace != nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats") + if reg == nil { + reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats") + } + } wgEvents := &eventCounter{ - count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge - added: monitoring.NewUint(nil, "filebeat.events.added"), - done: monitoring.NewUint(nil, "filebeat.events.done"), + count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge + added: monitoring.NewUint(reg, "filebeat.events.added"), + done: monitoring.NewUint(reg, "filebeat.events.done"), } finishedLogger := newFinishedLogger(wgEvents) diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index c172d24464c8..77dee19858a7 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch EphemeralID: eid, FirstStart: time.Now(), StartTime: time.Now(), - Monitoring: struct { - DefaultUsername string - }{ - DefaultUsername: "test", - }, } + info.Monitoring.DefaultUsername = "test" sched = scheduler.Create( 1, @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil }, - Stats: plugin.NewPluginCountersRecorder("test", reg)}, + Stats: plugin.NewPluginCountersRecorder("test", reg), + }, built, closed } diff --git a/libbeat/beat/info.go b/libbeat/beat/info.go index 314597abbb56..7c3b5c0d90f8 100644 --- a/libbeat/beat/info.go +++ b/libbeat/beat/info.go @@ -22,6 +22,8 @@ import ( "github.com/gofrs/uuid/v5" "go.opentelemetry.io/collector/consumer" + + "github.com/elastic/elastic-agent-libs/monitoring" ) // Info stores a beats instance meta data. @@ -41,9 +43,10 @@ type Info struct { // Monitoring-related fields Monitoring struct { - DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring + Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance } - LogConsumer consumer.Logs //otel log consumer + LogConsumer consumer.Logs // otel log consumer } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 6332ebac39b5..2d1eb3a20f02 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c config.OverwriteConfigOpts(configOpts(store)) } + b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String()) + instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version) if err != nil { return nil, fmt.Errorf("error setting up instrumentation: %w", err) @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c return nil, fmt.Errorf("error creating processors: %w", err) } - reg := monitoring.Default.GetRegistry(b.Info.Name) - if reg == nil { - reg = monitoring.Default.NewRegistry(b.Info.Name) - } - // This should be replaced with static config for otel consumer // but need to figure out if we want the Queue settings from here. outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c } } - tel := reg.GetRegistry("state") + uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry() + + tel := uniq_reg.GetRegistry("state") if tel == nil { - tel = reg.NewRegistry("state") + tel = uniq_reg.NewRegistry("state") } monitors := pipeline.Monitors{ - Metrics: reg, + Metrics: uniq_reg, Telemetry: tel, Logger: logp.L().Named("publisher"), Tracer: b.Instrumentation.Tracer(), @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c b.Publisher = publisher return b, nil - } // InitWithSettings does initialization of things common to all actions (read confs, flags) @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) { hostname := b.Info.FQDNAwareHostname(useFQDN) // info.hostname - infoRegistry := monitoring.GetNamespace("info").GetRegistry() + var infoRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info") + if infoRegistry == nil { + infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info") + } + } else { + infoRegistry = monitoring.GetNamespace("info").GetRegistry() + } monitoring.NewString(infoRegistry, "hostname").Set(hostname) // state.host - stateRegistry := monitoring.GetNamespace("state").GetRegistry() + var stateRegistry *monitoring.Registry + if b.Info.Monitoring.Namespace != nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state") + if stateRegistry == nil { + stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state") + } + } else { + stateRegistry = monitoring.GetNamespace("state").GetRegistry() + } monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report) } diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index dd6d198cc409..e098d3e746bc 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -60,7 +60,7 @@ func (bi *benchmarkInput) Test(ctx v2.TestContext) error { // Run starts the data generation. func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error { var wg sync.WaitGroup - metrics := newInputMetrics(ctx.ID) + metrics := newInputMetrics(ctx) for i := uint8(0); i < bi.cfg.Threads; i++ { wg.Add(1) @@ -103,8 +103,8 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg ticker.Stop() return case <-ticker.C: - //don't want to block on filling doPublish channel - //so only send as many as it can hold right now + // don't want to block on filling doPublish channel + // so only send as many as it can hold right now numToSend := cap(pubChan) - len(pubChan) for i := 0; i < numToSend; i++ { pubChan <- true @@ -157,8 +157,8 @@ type inputMetrics struct { } // newInputMetrics returns an input metric for the benchmark processor. -func newInputMetrics(id string) *inputMetrics { - reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) +func newInputMetrics(ctx v2.Context) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry()) out := &inputMetrics{ unregister: unreg, eventsPublished: monitoring.NewUint(reg, "events_published_total"),