From f15799eeb31dc1589414af13aaf936dd0a3797b4 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 24 Sep 2015 16:26:52 -0400 Subject: [PATCH] Set the flag *before* we retry the message Go correctly caught this as a race condition. --- async_producer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async_producer.go b/async_producer.go index 92f9f53fd..8ce8b33e9 100644 --- a/async_producer.go +++ b/async_producer.go @@ -667,8 +667,8 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) msg.flags |= returned + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) if msg.flags&chaser == chaser { // ...but now we can start processing future messages again