Skip to content

Commit

Permalink
Merge pull request #385 from Shopify/consumer-logging
Browse files Browse the repository at this point in the history
Add better logs to the consumer
  • Loading branch information
eapache committed Mar 23, 2015
2 parents 909b5ca + 80a846d commit 94a77f7
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (child *partitionConsumer) dispatcher() {
child.broker = nil
}

Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
Expand Down Expand Up @@ -435,7 +436,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
response, err := w.fetchNewMessages()

if err != nil {
Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", w.broker.ID(), err)
w.abort(err)
return
}
Expand All @@ -450,6 +451,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
}
}
}
Expand All @@ -460,13 +462,15 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
// take new subscriptions, and abandon subscriptions that have been closed
for _, child := range newSubscriptions {
w.subscriptions[child] = none{}
Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
}

for child := range w.subscriptions {
select {
case <-child.dying:
close(child.trigger)
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
default:
}
}
Expand Down

0 comments on commit 94a77f7

Please sign in to comment.