Skip to content

Commit

Permalink
Add queue percentage to libbeat metrics (elastic#39205)
Browse files Browse the repository at this point in the history
* add queue full percentage metric

* newline

* add div by zero check

* change name

* linter

* fix gauge settings

* linter...

* change name

* set percentage when we set queue max

* change name

* round numbers
  • Loading branch information
fearful-symmetry authored Apr 30, 2024
1 parent 8c48989 commit 562e48e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 36 deletions.
66 changes: 34 additions & 32 deletions libbeat/monitoring/report/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,36 @@ import (
// TODO: Replace this with a proper solution that uses the metric type from
// where it is defined. See: https://github.com/elastic/beats/issues/5433
var gauges = map[string]bool{
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.filled.pct.events": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
}

// isGauge returns true when the given metric key name represents a gauge value.
Expand Down Expand Up @@ -249,16 +251,16 @@ func toKeyValuePairs(snaps map[string]monitoring.FlatSnapshot) []interface{} {
for name, snap := range snaps {
data := make(mapstr.M, snapshotLen(snap))
for k, v := range snap.Bools {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Floats {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Ints {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
for k, v := range snap.Strings {
data.Put(k, v) //nolint:errcheck // All keys within the flat snapshot are unique and are for scalar values.
data.Put(k, v)
}
if len(data) > 0 {
args = append(args, logp.Reflect(name, data))
Expand Down
29 changes: 25 additions & 4 deletions libbeat/publisher/pipeline/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package pipeline

import "github.com/elastic/elastic-agent-libs/monitoring"
import (
"math"

"github.com/elastic/elastic-agent-libs/monitoring"
)

type observer interface {
pipelineObserver
Expand Down Expand Up @@ -67,8 +71,9 @@ type metricsObserverVars struct {
activeEvents *monitoring.Uint

// queue metrics
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
queueACKed *monitoring.Uint
queueMaxEvents *monitoring.Uint
percentQueueFull *monitoring.Float
}

func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
Expand All @@ -92,7 +97,8 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
queueACKed: monitoring.NewUint(reg, "queue.acked"),
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),

activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge
percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"),
},
}
}
Expand Down Expand Up @@ -121,12 +127,24 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() }
func (o *metricsObserver) newEvent() {
o.vars.events.Inc()
o.vars.activeEvents.Inc()
o.setPercentageFull()
}

// setPercentageFull is used interally to set the `queue.full` metric
func (o *metricsObserver) setPercentageFull() {
maxEvt := o.vars.queueMaxEvents.Get()
if maxEvt != 0 {
pct := float64(o.vars.activeEvents.Get()) / float64(maxEvt)
pctRound := math.Round(pct/0.0005) * 0.0005
o.vars.percentQueueFull.Set(pctRound)
}
}

// (client) event is filtered out (on purpose or failed)
func (o *metricsObserver) filteredEvent() {
o.vars.filtered.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

// (client) managed to push an event into the publisher pipeline
Expand All @@ -138,6 +156,7 @@ func (o *metricsObserver) publishedEvent() {
func (o *metricsObserver) failedPublishEvent() {
o.vars.failed.Inc()
o.vars.activeEvents.Dec()
o.setPercentageFull()
}

//
Expand All @@ -148,11 +167,13 @@ func (o *metricsObserver) failedPublishEvent() {
func (o *metricsObserver) queueACKed(n int) {
o.vars.queueACKed.Add(uint64(n))
o.vars.activeEvents.Sub(uint64(n))
o.setPercentageFull()
}

// (queue) maximum queue event capacity
func (o *metricsObserver) queueMaxEvents(n int) {
o.vars.queueMaxEvents.Set(uint64(n))
o.setPercentageFull()
}

//
Expand Down

0 comments on commit 562e48e

Please sign in to comment.