diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7298b388919..7568ddf7d9a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v6.7.2...6.8[Check the HEAD diff] - Fix goroutine leak happening when harvesters are dynamically stopped. {pull}11263[11263] - Fix initialization of the TCP input logger. {pull}11605[11605] +- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] *Heartbeat* diff --git a/libbeat/publisher/pipeline/acker.go b/libbeat/publisher/pipeline/acker.go index 1d4155677ac..5689b86d445 100644 --- a/libbeat/publisher/pipeline/acker.go +++ b/libbeat/publisher/pipeline/acker.go @@ -139,6 +139,12 @@ func (a *gapCountACK) ackLoop() { case <-a.done: closing = true a.done = nil + if a.events.Load() == 0 { + // stop worker, if all events accounted for have been ACKed. + // If new events are added after this acker won't handle them, which may + // result in duplicates + return + } case <-a.pipeline.ackDone: return @@ -146,12 +152,13 @@ func (a *gapCountACK) ackLoop() { case n := <-acks: empty := a.handleACK(n) if empty && closing && a.events.Load() == 0 { - // stop worker, iff all events accounted for have been ACKed + // stop worker, if and only if all events accounted for have been ACKed return } case <-drop: // TODO: accumulate multiple drop events + flush count with timer + a.events.Sub(1) a.fn(1, 0) } }