From 906ed729ae756fc01e0c9c9d78b31a2840022022 Mon Sep 17 00:00:00 2001 From: "Nicolas S. Dade" Date: Mon, 15 Aug 2016 17:09:01 -0700 Subject: [PATCH] fix erronous errTimedOut ("abandoned subscription...because consuming was taking too long") The expiryTimer continues to run after msg is delivered to child.messages. If <-child.feeder takes > MaxProcessingTime (which depends on how fast the broker runs), the expiryTimer can expire before we Reset() it. If this happens there is a message waiting in expiryTimer.C which, if we don't clear it out, makes us think the consumer stalled. This is reproduced with a high traffic partition on a busy broker, and measuring the time each line of the loop takes. The >100ms stall is the <-child.feeder inside 'range child.feeder'. --- consumer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index c70b528f0..3e6648303 100644 --- a/consumer.go +++ b/consumer.go @@ -420,7 +420,10 @@ feederLoop: msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { - expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + if !expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) { + // expiryTimer was expired; clear out the waiting msg + <-expiryTimer.C + } select { case child.messages <- msg: