From 52110d0cca32d3da893b83d2f451412199eec372 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Fri, 17 Apr 2015 13:52:56 +0000 Subject: [PATCH] Producer logging tweaks - in leaderDispatcher, make the topic/partition part of the initial log slug for convenience and consistency - don't log on every switch from normal to flushing in the loop, just the first, since 99% of the time we burn right through them all and we only need one line to indicate that, not 5 or 10 - similarly, don't log on every switch from flushing to normal, just do so if the highWatermark is 0 and it breaks the loop - in combination with the previous change we end up with just an entry and exit log for the loop, rather than two logs per iteration - in messageAggregator, make the broker ID part of the slug (it wasn't even logged before) --- async_producer.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5ef33cd05..59c29a219 100644 --- a/async_producer.go +++ b/async_producer.go @@ -342,10 +342,10 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) highWatermark = msg.retries - Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) retryState[msg.retries].expectChaser = true output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1} - Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID()) p.unrefBrokerProducer(leader, output) output = nil time.Sleep(p.conf.Producer.Retry.Backoff) @@ -363,17 +363,16 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set, // meaning this retry level is done and we can go down (at least) one level and flush that retryState[highWatermark].expectChaser = false - Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark) for { highWatermark-- - Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition) if output == nil { if err := breaker.Run(doUpdate); err != nil { p.returnErrors(retryState[highWatermark].buf, err) goto flushDone } - Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) } for _, msg := range retryState[highWatermark].buf { @@ -383,11 +382,11 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch flushDone: retryState[highWatermark].buf = nil if retryState[highWatermark].expectChaser { - Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition) + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) break } else { - Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition) if highWatermark == 0 { + Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition) break } } @@ -406,7 +405,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch time.Sleep(p.conf.Producer.Retry.Backoff) continue } - Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition) + Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) } output <- msg @@ -447,7 +446,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) || (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) || (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) { - Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush") + Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID()) output <- buffer timer = nil buffer = nil