Skip to content

Commit

Permalink
Merge pull request #429 from Shopify/producer-loggin
Browse files Browse the repository at this point in the history
Producer logging tweaks
  • Loading branch information
eapache committed Apr 17, 2015
2 parents 3b52e6b + 52110d0 commit 4b2a9ef
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
highWatermark = msg.retries
Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
retryState[msg.retries].expectChaser = true
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID())
p.unrefBrokerProducer(leader, output)
output = nil
time.Sleep(p.conf.Producer.Retry.Backoff)
Expand All @@ -363,17 +363,16 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
// meaning this retry level is done and we can go down (at least) one level and flush that
retryState[highWatermark].expectChaser = false
Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark)
for {
highWatermark--
Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)

if output == nil {
if err := breaker.Run(doUpdate); err != nil {
p.returnErrors(retryState[highWatermark].buf, err)
goto flushDone
}
Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
}

for _, msg := range retryState[highWatermark].buf {
Expand All @@ -383,11 +382,11 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
flushDone:
retryState[highWatermark].buf = nil
if retryState[highWatermark].expectChaser {
Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
break
} else {
Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
if highWatermark == 0 {
Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition)
break
}
}
Expand All @@ -406,7 +405,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
time.Sleep(p.conf.Producer.Retry.Backoff)
continue
}
Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
}

output <- msg
Expand Down Expand Up @@ -447,7 +446,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
output <- buffer
timer = nil
buffer = nil
Expand Down

0 comments on commit 4b2a9ef

Please sign in to comment.