diff --git a/consumer.go b/consumer.go index 8a3ca6f02..20adc862d 100644 --- a/consumer.go +++ b/consumer.go @@ -271,7 +271,7 @@ type partitionConsumer struct { feeder chan *FetchResponse trigger, dying chan none - dispatchReason error + responseResult error fetchSize int32 offset int64 @@ -402,24 +402,7 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { for response := range child.feeder { - switch err := child.handleResponse(response); err { - case nil: - break - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // so shut it down and force the user to choose what to do - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) - child.sendError(err) - child.AsyncClose() - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // these three are not fatal errors, but do require redispatching - child.dispatchReason = err - default: - // dunno, tell the user and try redispatching - child.sendError(err) - child.dispatchReason = err - } - + child.responseResult = child.handleResponse(response) child.broker.acks.Done() } @@ -569,7 +552,10 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - bc.updateSubscriptionCache(newSubscriptions) + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -591,16 +577,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { child.feeder <- response } bc.acks.Wait() + bc.handleResponses() } } -func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) { - // take new subscriptions, and abandon subscriptions that have been closed - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } - +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed for child := range bc.subscriptions { select { case <-child.dying: @@ -608,13 +590,32 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC close(child.trigger) delete(bc.subscriptions, child) default: - if child.dispatchReason != nil { + switch child.responseResult { + case nil: + break + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(child.responseResult) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, child.responseResult) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(child.responseResult) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.dispatchReason) - child.dispatchReason = nil + bc.broker.ID(), child.topic, child.partition, child.responseResult) child.trigger <- none{} delete(bc.subscriptions, child) } + + child.responseResult = nil } } }