Skip to content

Commit

Permalink
Fix duplicate batches in retry queue (#5520)
Browse files Browse the repository at this point in the history
On error in the Logstash output sender, failed batches might get enqueued two
times. This can lead to multiple resends and ACKs for the same events.

In filebeat/winlogbeat, waiting for ACK from output, at most one ACK is
required. With potentially multiple ACKs (especially with multiple
consecutive IO errors) a deadlock in the outputs ACK handler can occur.

This PR ensures batches can not be returned to the retry queue via 2 code
paths (remove race between competing workers):
- async output worker does not return events back into retry queue
- async clients are required to always report retrieable errors via
callbacks
- add some more detailed debug logs to the LS output, that can help in
  identifiying ACKed batches still being retried
  • Loading branch information
Steffen Siering authored and ruflin committed Nov 14, 2017
1 parent 38c2219 commit f806e3b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ https://github.com/elastic/beats/compare/v5.6.4...5.6[Check the HEAD diff]

*Affecting all Beats*

- Fix duplicate batches of events in retry queue. {pull}5520[5520]

*Filebeat*

*Heartbeat*
Expand Down
43 changes: 33 additions & 10 deletions libbeat/outputs/logstash/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,7 @@ func (c *asyncClient) AsyncPublishEvents(
return nil
}

ref := &msgRef{
client: c,
count: 1,
batch: data,
batchSize: len(data),
win: c.win,
cb: cb,
err: nil,
}
ref := newMsgRef(c, data, cb)
defer ref.dec()

for len(data) > 0 {
Expand Down Expand Up @@ -171,7 +163,7 @@ func (c *asyncClient) sendEvents(ref *msgRef, data []outputs.Data) error {
for i, d := range data {
window[i] = d
}
atomic.AddInt32(&ref.count, 1)
ref.inc()
return c.client.Send(ref.callback, window)
}

Expand All @@ -183,7 +175,33 @@ func (r *msgRef) callback(seq uint32, err error) {
}
}

func newMsgRef(
client *asyncClient,
data []outputs.Data,
cb func([]outputs.Data, error),
) *msgRef {
r := &msgRef{
client: client,
count: 1,
batch: data,
batchSize: len(data),
win: client.win,
cb: cb,
err: nil,
}

debug("msgref(%p) new: batch=%p, cb=%p", r, &r.batch[0], cb)
return r
}

func (r *msgRef) inc() {
count := atomic.AddInt32(&r.count, 1)
debug("msgref(%p) inc -> %v", r, count)
}

func (r *msgRef) done(n uint32) {
debug("msgref(%p) done(%v)", r, n)

ackedEvents.Add(int64(n))
r.batch = r.batch[n:]
if r.win != nil {
Expand All @@ -193,6 +211,8 @@ func (r *msgRef) done(n uint32) {
}

func (r *msgRef) fail(n uint32, err error) {
debug("msgref(%p) fail(%v, %v)", r, n, err)

ackedEvents.Add(int64(n))
if r.err == nil {
r.err = err
Expand All @@ -206,6 +226,7 @@ func (r *msgRef) fail(n uint32, err error) {

func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
debug("msgref(%p) dec -> %v", r, i)
if i > 0 {
return
}
Expand All @@ -214,9 +235,11 @@ func (r *msgRef) dec() {
if err != nil {
eventsNotAcked.Add(int64(len(r.batch)))
logp.Err("Failed to publish events (host: %v) caused by: %v", r.client.host, err)
debug("msgref(%p) exec callback(%p, %v)", r, &r.batch[0], err)
r.cb(r.batch, err)
return
}

debug("msgref(%p) exec callback(nil, nil)", r)
r.cb(nil, nil)
}
21 changes: 3 additions & 18 deletions libbeat/outputs/mode/lb/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,10 @@ func (w *asyncWorker) sendLoop() (done bool) {
}

func (w *asyncWorker) onMessage(msg eventsMessage) error {
var err error
if msg.datum.Event != nil {
err = w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum)
} else {
err = w.client.AsyncPublishEvents(w.handleResults(msg), msg.data)
return w.client.AsyncPublishEvent(w.handleResult(msg), msg.datum)
}

if err != nil {
if msg.attemptsLeft > 0 {
msg.attemptsLeft--
}

// asynchronously retry to insert message (if attempts left), so worker can not
// deadlock on retries channel if client puts multiple failed outstanding
// events into the pipeline
w.onFail(msg, err)
}

return err
return w.client.AsyncPublishEvents(w.handleResults(msg), msg.data)
}

func (w *asyncWorker) handleResult(msg eventsMessage) func(error) {
Expand Down Expand Up @@ -193,7 +178,7 @@ func (w *asyncWorker) handleResults(msg eventsMessage) func([]outputs.Data, erro
}

// all events published -> signal success
debugf("async bulk publish success")
debugf("async bulk publish success (signaler=%v)", msg.signaler)
op.SigCompleted(msg.signaler)
}
}
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/mode/modetest/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func AsyncPublishFailStartWith(
inc := makeCounter(n, err)
return func(cb func([]outputs.Data, error), data []outputs.Data) error {
if err := inc(); err != nil {
cb(data, err)
return err
}
return pub(cb, data)
Expand Down

0 comments on commit f806e3b

Please sign in to comment.