Skip to content

Commit

Permalink
filebeat/input/{tcp,udp}: relax requirements that proc entries be pre…
Browse files Browse the repository at this point in the history
…sent when an address is

The previous logic required that if an address is present according to
net.LookupIP, then it must be present in the /proc/net entries. This may
not the case when a tcp/udp listener is created without specifying
tcp4/udp4 for an IPv4 host address and there is an expectation of
finding the socket in the /proc/net/{tcp,udp} table. So only complain if
the entry has ever been found and never skip storing a metric even when
there is a legitimate reason to expect its presence — because it has
been seen in the past. This second part is an extension to reduce the
loss of metric data, even if it is only partial.
  • Loading branch information
efd6 committed Jan 24, 2024
1 parent ebd8512 commit 90dd513
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Update CEL extensions library to v1.7.0. {pull}37172[37172]
- Add support for complete URL replacement in HTTPJSON chain steps. {pull}37486[37486]
- Add support for user-defined query selection in EntraID entity analytics provider. {pull}37653[37653]
- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714]

*Auditbeat*

Expand Down
35 changes: 27 additions & 8 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,31 +238,50 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.
// base level for the rx_queue values and ensures that if the
// constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get initial tcp stats from /proc: %v", err)
want4 = false
log.Infof("did not get initial tcp stats from /proc: %v", err)
}
want6 := true
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get initial tcp6 stats from /proc: %v", err)
want6 = false
log.Infof("did not get initial tcp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
}
m.rxQueue.Set(uint64(rx + rx6))

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
if want4 {
log.Warnf("failed to get tcp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
continue
if want6 {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
}
m.rxQueue.Set(uint64(rx + rx6))
case <-m.done:
t.Stop()
return
Expand Down
39 changes: 29 additions & 10 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,52 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.
// base level for the rx_queue and drops values and ensures that
// if the constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get initial udp stats from /proc: %v", err)
want4 = false
log.Infof("did not get initial udp stats from /proc: %v", err)
}
want6 := true
rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get initial udp6 stats from /proc: %v", err)
want6 = false
log.Infof("did not get initial udp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
}
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
if want4 {
log.Warnf("failed to get udp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get udp6 stats from /proc: %v", err)
continue
if want6 {
log.Warnf("failed to get udp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
}
m.rxQueue.Set(uint64(rx + rx6))
m.drops.Set(uint64(drops + drops6))
case <-m.done:
t.Stop()
return
Expand Down

0 comments on commit 90dd513

Please sign in to comment.