From f1c29097619308ee3ff10fbea90a29850338cc2b Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Thu, 25 Jan 2024 19:43:12 +1030 Subject: [PATCH] filebeat/input/{tcp,udp}: relax requirements that proc entries be present when an address is (#37714) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous logic required that if an address is present according to net.LookupIP, then it must be present in the /proc/net entries. This may not the case when a tcp/udp listener is created without specifying tcp4/udp4 for an IPv4 host address and there is an expectation of finding the socket in the /proc/net/{tcp,udp} table. So only complain if the entry has ever been found and never skip storing a metric even when there is a legitimate reason to expect its presence — because it has been seen in the past. This second part is an extension to reduce the loss of metric data, even if it is only partial. Also fix the base of the queue length parsers. This was incorrectly claimed to be decimal due to misreading the kernel source. (cherry picked from commit b09ac16b7b4cb448c623a0a052a40db1a76b950e) --- CHANGELOG.next.asciidoc | 22 +++++++++++++++++++ filebeat/input/tcp/input.go | 39 ++++++++++++++++++++++++--------- filebeat/input/udp/input.go | 43 ++++++++++++++++++++++++++----------- 3 files changed, 82 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7334d11e743f..64b883bfc4ea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of Juniper SRX structured data when there is no leading junos element. {issue}36270[36270] {pull}36308[36308] - Fix Filebeat Cisco module with missing escape character {issue}36325[36325] {pull}36326[36326] - Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496] +- Fix TCP/UDP metric queue length parsing base. {pull}37714[37714] *Heartbeat* @@ -135,6 +136,27 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added support for Okta OAuth2 provider in the CEL input. {issue}36336[36336] {pull}36521[36521] - Added support for new features & removed partial save mechanism in the Azure Blob Storage input. {issue}35126[35126] {pull}36690[36690] - Added support for new features and removed partial save mechanism in the GCS input. {issue}35847[35847] {pull}36713[36713] +- Re-use buffers to optimise memory allocation in fingerprint mode of filestream {pull}36736[36736] +- Allow http_endpoint input to receive PUT and PATCH requests. {pull}36734[36734] +- Add cache processor. {pull}36786[36786] +- Avoid unwanted publication of Azure entity records. {pull}36753[36753] +- Avoid unwanted publication of Okta entity records. {pull}36770[36770] +- Add support for Digest Authentication to CEL input. {issue}35514[35514] {pull}36932[36932] +- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950] +- Add network processor in addition to interface based direction resolution. {pull}37023[37023] +- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999] +- Make CEL input log current transaction ID when request tracing is turned on. {pull}37065[37065] +- Made Azure Blob Storage input GA and updated docs accordingly. {pull}37128[37128] +- Add request trace logging to http_endpoint input. {issue}36951[36951] {pull}36957[36957] +- Made GCS input GA and updated docs accordingly. {pull}37127[37127] +- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160] +- Prevent CEL input from re-entering the eval loop when an evaluation failed. {pull}37161[37161] +- Update CEL extensions library to v1.7.0. {pull}37172[37172] +- Add support for complete URL replacement in HTTPJSON chain steps. {pull}37486[37486] +- Add support for user-defined query selection in EntraID entity analytics provider. {pull}37653[37653] +- Update CEL extensions library to v1.8.0 to provide runtime error location reporting. {issue}37304[37304] {pull}37718[37718] +- Add request trace logging for chained API requests. {issue}37551[36551] {pull}37682[37682] +- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714] *Auditbeat* diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 762c6b6ba355..1b3ffa7c2aa4 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -238,31 +238,50 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp. // base level for the rx_queue values and ensures that if the // constructed address values are malformed we panic early // within the period of system testing. + want4 := true rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) if err != nil { - log.Warnf("failed to get initial tcp stats from /proc: %v", err) + want4 = false + log.Infof("did not get initial tcp stats from /proc: %v", err) } + want6 := true rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { - log.Warnf("failed to get initial tcp6 stats from /proc: %v", err) + want6 = false + log.Infof("did not get initial tcp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) } - m.rxQueue.Set(uint64(rx + rx6)) t := time.NewTicker(each) for { select { case <-t.C: + var found bool rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified) if err != nil { - log.Warnf("failed to get tcp stats from /proc: %v", err) - continue + if want4 { + log.Warnf("failed to get tcp stats from /proc: %v", err) + } + } else { + found = true + want4 = true } rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { - log.Warnf("failed to get tcp6 stats from /proc: %v", err) - continue + if want6 { + log.Warnf("failed to get tcp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) } - m.rxQueue.Set(uint64(rx + rx6)) case <-m.done: t.Stop() return @@ -323,10 +342,10 @@ func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi } found = true - // queue lengths are decimal, e.g.: + // queue lengths are hex, e.g.: // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643 // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987 - v, err := strconv.ParseInt(string(r), 10, 64) + v, err := strconv.ParseInt(string(r), 16, 64) if err != nil { return 0, fmt.Errorf("failed to parse rx_queue: %w", err) } diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 831fb41c2ee6..cd7ca0c56051 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -231,33 +231,52 @@ func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp. // base level for the rx_queue and drops values and ensures that // if the constructed address values are malformed we panic early // within the period of system testing. + want4 := true rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) if err != nil { - log.Warnf("failed to get initial udp stats from /proc: %v", err) + want4 = false + log.Infof("did not get initial udp stats from /proc: %v", err) } + want6 := true rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { - log.Warnf("failed to get initial udp6 stats from /proc: %v", err) + want6 = false + log.Infof("did not get initial udp6 stats from /proc: %v", err) + } + if !want4 && !want6 { + log.Warnf("failed to get initial udp or udp6 stats from /proc: %v", err) + } else { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) } - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) t := time.NewTicker(each) for { select { case <-t.C: + var found bool rx, drops, err := procNetUDP("/proc/net/udp", addr, hasUnspecified, addrIsUnspecified) if err != nil { - log.Warnf("failed to get udp stats from /proc: %v", err) - continue + if want4 { + log.Warnf("failed to get udp stats from /proc: %v", err) + } + } else { + found = true + want4 = true } rx6, drops6, err := procNetUDP("/proc/net/udp6", addr6, hasUnspecified6, addrIsUnspecified6) if err != nil { - log.Warnf("failed to get udp6 stats from /proc: %v", err) - continue + if want6 { + log.Warnf("failed to get udp6 stats from /proc: %v", err) + } + } else { + found = true + want6 = true + } + if found { + m.rxQueue.Set(uint64(rx + rx6)) + m.drops.Set(uint64(drops + drops6)) } - m.rxQueue.Set(uint64(rx + rx6)) - m.drops.Set(uint64(drops + drops6)) case <-m.done: t.Stop() return @@ -321,10 +340,10 @@ func procNetUDP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi } found = true - // queue lengths and drops are decimal, e.g.: + // queue lengths and drops are hex, e.g.: // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/udp.c#L3110 // - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/datagram.c#L1048 - v, err := strconv.ParseInt(string(r), 10, 64) + v, err := strconv.ParseInt(string(r), 16, 64) if err != nil { return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) }