diff --git a/async_producer.go b/async_producer.go index 92f9f53fd..8ce8b33e9 100644 --- a/async_producer.go +++ b/async_producer.go @@ -667,8 +667,8 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) msg.flags |= returned + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) if msg.flags&chaser == chaser { // ...but now we can start processing future messages again