Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unique per beat monitoring namespace #41939

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,18 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
waitEvents := newSignalWait()

// count active events for waiting on shutdown
var reg *monitoring.Registry

if b.Info.Monitoring.Namespace != nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("stats")
if reg == nil {
reg = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("stats")
}
}
wgEvents := &eventCounter{
count: monitoring.NewInt(nil, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(nil, "filebeat.events.added"),
done: monitoring.NewUint(nil, "filebeat.events.done"),
count: monitoring.NewInt(reg, "filebeat.events.active"), // Gauge
added: monitoring.NewUint(reg, "filebeat.events.added"),
done: monitoring.NewUint(reg, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)

Expand Down
9 changes: 3 additions & 6 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ func makeMockFactory(pluginsReg *plugin.PluginsReg) (factory *RunnerFactory, sch
EphemeralID: eid,
FirstStart: time.Now(),
StartTime: time.Now(),
Monitoring: struct {
DefaultUsername string
}{
DefaultUsername: "test",
},
}
info.Monitoring.DefaultUsername = "test"

sched = scheduler.Create(
1,
Expand Down Expand Up @@ -246,7 +242,8 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {

return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, nil
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
Stats: plugin.NewPluginCountersRecorder("test", reg),
},
built,
closed
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"

"github.com/elastic/elastic-agent-libs/monitoring"
)

// Info stores a beats instance meta data.
Expand All @@ -41,9 +43,10 @@ type Info struct {

// Monitoring-related fields
Monitoring struct {
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
DefaultUsername string // The default username to be used to connect to Elasticsearch Monitoring
Namespace *monitoring.Namespace // a monitor namespace that is unique per beat instance
}
LogConsumer consumer.Logs //otel log consumer
LogConsumer consumer.Logs // otel log consumer

}

Expand Down
36 changes: 25 additions & 11 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
config.OverwriteConfigOpts(configOpts(store))
}

b.Beat.Info.Monitoring.Namespace = monitoring.GetNamespace(b.Info.Beat + "-" + b.Info.ID.String())

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
Expand Down Expand Up @@ -469,11 +471,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
return nil, fmt.Errorf("error creating processors: %w", err)
}

reg := monitoring.Default.GetRegistry(b.Info.Name)
if reg == nil {
reg = monitoring.Default.NewRegistry(b.Info.Name)
}

// This should be replaced with static config for otel consumer
// but need to figure out if we want the Queue settings from here.
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
Expand All @@ -485,12 +482,14 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
}
}

tel := reg.GetRegistry("state")
uniq_reg := b.Beat.Info.Monitoring.Namespace.GetRegistry()

tel := uniq_reg.GetRegistry("state")
if tel == nil {
tel = reg.NewRegistry("state")
tel = uniq_reg.NewRegistry("state")
}
monitors := pipeline.Monitors{
Metrics: reg,
Metrics: uniq_reg,
Telemetry: tel,
Logger: logp.L().Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
Expand All @@ -510,7 +509,6 @@ func NewBeatReceiver(settings Settings, receiverConfig map[string]interface{}, c
b.Publisher = publisher

return b, nil

}

// InitWithSettings does initialization of things common to all actions (read confs, flags)
Expand Down Expand Up @@ -831,11 +829,27 @@ func (b *Beat) RegisterHostname(useFQDN bool) {
hostname := b.Info.FQDNAwareHostname(useFQDN)

// info.hostname
infoRegistry := monitoring.GetNamespace("info").GetRegistry()
var infoRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("info")
if infoRegistry == nil {
infoRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("info")
}
} else {
infoRegistry = monitoring.GetNamespace("info").GetRegistry()
}
monitoring.NewString(infoRegistry, "hostname").Set(hostname)

// state.host
stateRegistry := monitoring.GetNamespace("state").GetRegistry()
var stateRegistry *monitoring.Registry
if b.Info.Monitoring.Namespace != nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().GetRegistry("state")
if stateRegistry == nil {
stateRegistry = b.Info.Monitoring.Namespace.GetRegistry().NewRegistry("state")
}
} else {
stateRegistry = monitoring.GetNamespace("state").GetRegistry()
}
monitoring.NewFunc(stateRegistry, "host", host.ReportInfo(hostname), monitoring.Report)
}

Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/benchmark/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (bi *benchmarkInput) Test(ctx v2.TestContext) error {
// Run starts the data generation.
func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error {
var wg sync.WaitGroup
metrics := newInputMetrics(ctx.ID)
metrics := newInputMetrics(ctx)

for i := uint8(0); i < bi.cfg.Threads; i++ {
wg.Add(1)
Expand Down Expand Up @@ -103,8 +103,8 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg
ticker.Stop()
return
case <-ticker.C:
//don't want to block on filling doPublish channel
//so only send as many as it can hold right now
// don't want to block on filling doPublish channel
// so only send as many as it can hold right now
numToSend := cap(pubChan) - len(pubChan)
for i := 0; i < numToSend; i++ {
pubChan <- true
Expand Down Expand Up @@ -157,8 +157,8 @@ type inputMetrics struct {
}

// newInputMetrics returns an input metric for the benchmark processor.
func newInputMetrics(id string) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
func newInputMetrics(ctx v2.Context) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, ctx.ID, ctx.Agent.Monitoring.Namespace.GetRegistry())
out := &inputMetrics{
unregister: unreg,
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
Expand Down
23 changes: 11 additions & 12 deletions x-pack/heartbeat/scenarios/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import (
beatversion "github.com/elastic/beats/v7/libbeat/version"
)

type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error)
type ScenarioRunMeta struct {
URL *url.URL
Status monitorstate.StateStatus
}
type (
ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error)
ScenarioRunMeta struct {
URL *url.URL
Status monitorstate.StateStatus
}
)

type Scenario struct {
Name string
Expand Down Expand Up @@ -155,7 +157,6 @@ func NewScenarioDB() *ScenarioDB {
ByTag: map[string][]Scenario{},
All: []Scenario{},
}

}

func (sdb *ScenarioDB) Init() {
Expand Down Expand Up @@ -250,7 +251,9 @@ func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta,

mIface, err := f.Create(pipe, conf)
require.NoError(t, err)
mtr.monitor = mIface.(*monitors.Monitor)
mon, ok := mIface.(*monitors.Monitor)
require.True(t, ok, "type assertion didn't succeed")
mtr.monitor = mon
require.NotNil(t, mtr.monitor, "could not convert to monitor %v", mIface)
mtr.Events = pipe.PublishedEvents

Expand Down Expand Up @@ -281,12 +284,8 @@ func setupFactoryAndSched(location *hbconfig.LocationWithID, stateLoader monitor
EphemeralID: eid,
FirstStart: time.Now(),
StartTime: time.Now(),
Monitoring: struct {
DefaultUsername string
}{
DefaultUsername: "test",
},
}
info.Monitoring.DefaultUsername = "test"

sched = scheduler.Create(
1,
Expand Down
Loading