diff --git a/async_producer.go b/async_producer.go index 5ef33cd05..59c29a219 100644 --- a/async_producer.go +++ b/async_producer.go @@ -342,10 +342,10 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) highWatermark = msg.retries - Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) retryState[msg.retries].expectChaser = true output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1} - Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID()) p.unrefBrokerProducer(leader, output) output = nil time.Sleep(p.conf.Producer.Retry.Backoff) @@ -363,17 +363,16 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set, // meaning this retry level is done and we can go down (at least) one level and flush that retryState[highWatermark].expectChaser = false - Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark) for { highWatermark-- - Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition) if output == nil { if err := breaker.Run(doUpdate); err != nil { p.returnErrors(retryState[highWatermark].buf, err) goto flushDone } - Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) } for _, msg := range retryState[highWatermark].buf { @@ -383,11 +382,11 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch flushDone: retryState[highWatermark].buf = nil if retryState[highWatermark].expectChaser { - Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) break } else { - Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition) if highWatermark == 0 { + Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition) break } } @@ -406,7 +405,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch time.Sleep(p.conf.Producer.Retry.Backoff) continue } - Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) } output <- msg @@ -447,7 +446,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) || (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) || (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) { - Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush") + Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID()) output <- buffer timer = nil buffer = nil