From db17c03351cf79ecf67ca234a466cbeca3fa72d8 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Mon, 27 Jul 2015 16:38:11 -0400 Subject: [PATCH] consumer: fix another race pointed out by Maxim Take the previous refactor to its logical conclusion by handling *all* the error logic in the brokerConsumer, not the responseFeeder. This fixes the race to close the dying channel (since the brokerConsumer can just close the trigger instead as it has ownership). At the same time, refactor `updateSubscriptionCache` into `handleResponses`, and inline the "new subscriptions" bit into the main loop; otherwise we end up processing the previous iterations results at the very beginning of the next iteration, rather than at the very end of the current one. --- consumer.go | 61 +++++++++++++++++++++++++++-------------------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/consumer.go b/consumer.go index 8a3ca6f02..20adc862d 100644 --- a/consumer.go +++ b/consumer.go @@ -271,7 +271,7 @@ type partitionConsumer struct { feeder chan *FetchResponse trigger, dying chan none - dispatchReason error + responseResult error fetchSize int32 offset int64 @@ -402,24 +402,7 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { for response := range child.feeder { - switch err := child.handleResponse(response); err { - case nil: - break - case ErrOffsetOutOfRange: - // there's no point in retrying this it will just fail the same way again - // so shut it down and force the user to choose what to do - Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err) - child.sendError(err) - child.AsyncClose() - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // these three are not fatal errors, but do require redispatching - child.dispatchReason = err - default: - // dunno, tell the user and try redispatching - child.sendError(err) - child.dispatchReason = err - } - + child.responseResult = child.handleResponse(response) child.broker.acks.Done() } @@ -569,7 +552,10 @@ func (bc *brokerConsumer) subscriptionConsumer() { // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available for newSubscriptions := range bc.newSubscriptions { - bc.updateSubscriptionCache(newSubscriptions) + for _, child := range newSubscriptions { + bc.subscriptions[child] = none{} + Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) + } if len(bc.subscriptions) == 0 { // We're about to be shut down or we're about to receive more subscriptions. @@ -591,16 +577,12 @@ func (bc *brokerConsumer) subscriptionConsumer() { child.feeder <- response } bc.acks.Wait() + bc.handleResponses() } } -func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) { - // take new subscriptions, and abandon subscriptions that have been closed - for _, child := range newSubscriptions { - bc.subscriptions[child] = none{} - Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition) - } - +func (bc *brokerConsumer) handleResponses() { + // handles the response codes left for us by our subscriptions, and abandons ones that have been closed for child := range bc.subscriptions { select { case <-child.dying: @@ -608,13 +590,32 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC close(child.trigger) delete(bc.subscriptions, child) default: - if child.dispatchReason != nil { + switch child.responseResult { + case nil: + break + case ErrOffsetOutOfRange: + // there's no point in retrying this it will just fail the same way again + // shut it down and force the user to choose what to do + child.sendError(child.responseResult) + Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult) + close(child.trigger) + delete(bc.subscriptions, child) + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // not an error, but does need redispatching + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", + bc.broker.ID(), child.topic, child.partition, child.responseResult) + child.trigger <- none{} + delete(bc.subscriptions, child) + default: + // dunno, tell the user and try redispatching + child.sendError(child.responseResult) Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", - bc.broker.ID(), child.topic, child.partition, child.dispatchReason) - child.dispatchReason = nil + bc.broker.ID(), child.topic, child.partition, child.responseResult) child.trigger <- none{} delete(bc.subscriptions, child) } + + child.responseResult = nil } } }