From 88e6d81674c835a00c7263e7790ac253d4755cac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Tue, 7 May 2019 17:17:04 +0200 Subject: [PATCH] Fix memory leak in Filebeat pipeline acker (#12063) * Fix memory leak in Filebeat pipeline acker Before this change acker goroutine was kept forever as processed events count was not correctly updated. Filebeat sends an empty event to update file states, this event is not published, but treated as dropped, without updating counters. This change makes sures that `a.events` count gets updated for dropped events also, so the acker gets closed after all ACKs happen. (cherry picked from commit 965310573289806993cec957e5c9391bb3612e04) --- CHANGELOG.next.asciidoc | 10 ++++++++++ libbeat/publisher/pipeline/acker.go | 9 ++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 658b3bba01d3..ffff813670bf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -37,6 +37,16 @@ https://github.com/elastic/beats/compare/v6.7.2...6.x[Check the HEAD diff] *Filebeat* +- Add support for Cisco syslog format used by their switch. {pull}10760[10760] +- Cover empty request data, url and version in Apache2 module{pull}10730[10730] +- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] +- Improve detection of file deletion on Windows. {pull}10747[10747] +- Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] +- Fix `add_docker_metadata` source matching, using `log.file.path` field now. {pull}11577[11577] +- Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] +- Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] + *Heartbeat* *Journalbeat* diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677aca..5689b86d4458 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -139,6 +139,12 @@ func (a *gapCountACK) ackLoop() { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates + return + } case <-a.pipeline.ackDone: return @@ -146,12 +152,13 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } }