From 9301d01ff01cdd3f9e47fd45bf36ac9e84f6d25b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20P=C3=A9rez-Aradros=20Herce?= Date: Fri, 10 May 2019 13:10:30 +0200 Subject: [PATCH] Cherry-pick #12063 to 6.7: Fix memory leak in Filebeat pipeline acker (#12139) * Fix memory leak in Filebeat pipeline acker (#12063) * Fix memory leak in Filebeat pipeline acker Before this change acker goroutine was kept forever as processed events count was not correctly updated. Filebeat sends an empty event to update file states, this event is not published, but treated as dropped, without updating counters. This change makes sures that `a.events` count gets updated for dropped events also, so the acker gets closed after all ACKs happen. (cherry picked from commit 9dc1f39eb25f7bba48b3ae835df93cd7e0a3ab00) * Update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 2 ++ libbeat/publisher/pipeline/acker.go | 9 ++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 658b3bba01d..229e70ed43f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -37,6 +37,8 @@ https://github.com/elastic/beats/compare/v6.7.2...6.x[Check the HEAD diff] *Filebeat* +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] + *Heartbeat* *Journalbeat* diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677ac..5689b86d445 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -139,6 +139,12 @@ func (a *gapCountACK) ackLoop() { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates + return + } case <-a.pipeline.ackDone: return @@ -146,12 +152,13 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } }