Skip to content

Commit

Permalink
fix erronous errTimedOut ("abandoned subscription...because consuming…
Browse files Browse the repository at this point in the history
… was taking too long")

The expiryTimer continues to run after msg is delivered to
child.messages. If <-child.feeder takes > MaxProcessingTime (which
depends on how fast the broker runs), the expiryTimer can expire
before we Reset() it. If this happens there is a message waiting
in expiryTimer.C which, if we don't clear it out, makes us think
the consumer stalled.

This is reproduced with a high traffic partition on a busy
broker, and measuring the time each line of the loop takes. The
>100ms stall is the <-child.feeder inside 'range child.feeder'.
  • Loading branch information
nsd20463 committed Aug 16, 2016
1 parent f4a6263 commit 906ed72
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,10 @@ feederLoop:
msgs, child.responseResult = child.parseResponse(response)

for i, msg := range msgs {
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
if !expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) {
// expiryTimer was expired; clear out the waiting msg
<-expiryTimer.C
}

select {
case child.messages <- msg:
Expand Down

0 comments on commit 906ed72

Please sign in to comment.