Skip to content

Commit

Permalink
Cherry-pick #12063 to 6.7: Fix memory leak in Filebeat pipeline acker (
Browse files Browse the repository at this point in the history
…#12139)

* Fix memory leak in Filebeat pipeline acker (#12063)

* Fix memory leak in Filebeat pipeline acker

Before this change acker goroutine was kept forever as processed events
count was not correctly updated.

Filebeat sends an empty event to update file states, this event is not
published, but treated as dropped, without updating counters.

This change makes sures that `a.events` count gets updated for dropped
events also, so the acker gets closed after all ACKs happen.

(cherry picked from commit 9653105)

* Update CHANGELOG.next.asciidoc
  • Loading branch information
Carlos Pérez-Aradros Herce authored May 10, 2019
1 parent 5c49f28 commit db530d7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ https://github.com/elastic/beats/compare/v6.7.2...6.x[Check the HEAD diff]

*Filebeat*

- Fix memory leak in Filebeat pipeline acker. {pull}12063[12063]

*Heartbeat*

*Journalbeat*
Expand Down
9 changes: 8 additions & 1 deletion libbeat/publisher/pipeline/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,26 @@ func (a *gapCountACK) ackLoop() {
case <-a.done:
closing = true
a.done = nil
if a.events.Load() == 0 {
// stop worker, if all events accounted for have been ACKed.
// If new events are added after this acker won't handle them, which may
// result in duplicates
return
}

case <-a.pipeline.ackDone:
return

case n := <-acks:
empty := a.handleACK(n)
if empty && closing && a.events.Load() == 0 {
// stop worker, iff all events accounted for have been ACKed
// stop worker, if and only if all events accounted for have been ACKed
return
}

case <-drop:
// TODO: accumulate multiple drop events + flush count with timer
a.events.Sub(1)
a.fn(1, 0)
}
}
Expand Down

0 comments on commit db530d7

Please sign in to comment.