From 8b5d6d827bc6902abca7977afc79b7c5f35ba051 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 24 Sep 2015 16:22:49 -0400 Subject: [PATCH] Quick hacky fix for #449 Pending a proper refactor and tests etc. --- async_producer.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/async_producer.go b/async_producer.go index 8e229490f8..8ce8b33e9d 100644 --- a/async_producer.go +++ b/async_producer.go @@ -103,6 +103,7 @@ type flagSet int8 const ( chaser flagSet = 1 << iota // message is last in a group that failed shutdown // start the shutdown process + returned // returned mid-processing, so skip it (hacky fix for #449) ) // ProducerMessage is the collection of elements passed to the Producer in order to send a message. @@ -662,12 +663,12 @@ func (f *flusher) run() { func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { msgSets := make(map[string]map[int32][]*ProducerMessage) - for i, msg := range batch { + for _, msg := range batch { if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message + msg.flags |= returned f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil if msg.flags&chaser == chaser { // ...but now we can start processing future messages again @@ -791,12 +792,14 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa if msg.Key != nil { if keyBytes, err = msg.Key.Encode(); err != nil { p.returnError(msg, err) + msg.flags |= returned continue } } if msg.Value != nil { if valBytes, err = msg.Value.Encode(); err != nil { p.returnError(msg, err) + msg.flags |= returned continue } } @@ -851,7 +854,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg != nil { + if msg.flags&returned == 0 { p.returnError(msg, err) } } @@ -859,7 +862,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg == nil { + if msg.flags&returned == returned { continue } if p.conf.Producer.Return.Successes {