diff --git a/async_producer.go b/async_producer.go index 7424a3abd..e4e88ae93 100644 --- a/async_producer.go +++ b/async_producer.go @@ -504,9 +504,10 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag go withRecover(a.run) f := &flusher{ - parent: p, - broker: broker, - input: bridge, + parent: p, + broker: broker, + input: bridge, + currentRetries: make(map[string]map[int32]error), } go withRecover(f.run) @@ -582,17 +583,18 @@ shutdown: close(a.output) } -// one per broker -// takes a batch at a time from the messageAggregator and sends to the broker +// takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer broker *Broker input <-chan []*ProducerMessage + + currentRetries map[string]map[int32]error } func (f *flusher) run() { var closing error - currentRetries := make(map[string]map[int32]error) + Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) for batch := range f.input { @@ -601,30 +603,7 @@ func (f *flusher) run() { continue } - // group messages by topic/partition - msgSets := make(map[string]map[int32][]*ProducerMessage) - for i, msg := range batch { - if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.Partition] != nil { - if msg.flags&chaser == chaser { - // we can start processing this topic/partition again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - currentRetries[msg.Topic][msg.Partition] = nil - } - f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil // to prevent it being returned/retried twice - continue - } - - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet - } - - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) - } - + msgSets := f.groupAndFilter(batch) request := f.parent.buildRequest(msgSets) if request == nil { continue @@ -653,40 +632,77 @@ func (f *flusher) run() { continue } - // we iterate through the blocks in the request, not the response, so that we notice - // if the response is missing a block completely - for topic, partitionSet := range msgSets { - for partition, msgs := range partitionSet { + f.parseResponse(msgSets, response) + } + Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) +} - block := response.GetBlock(topic, partition) - if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) - continue - } +func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { + msgSets := make(map[string]map[int32][]*ProducerMessage) - switch block.Err { - case ErrNoError: - // All the messages for this topic-partition were delivered successfully! - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - f.parent.returnSuccesses(msgs) - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if currentRetries[topic] == nil { - currentRetries[topic] = make(map[int32]error) - } - currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) - default: - f.parent.returnErrors(msgs, block.Err) + for i, msg := range batch { + + if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { + // we're currently retrying this partition so we need to filter out this message + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) + batch[i] = nil + + if msg.flags&chaser == chaser { + // ...but now we can start processing future messages again + Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", + f.broker.ID(), msg.Topic, msg.Partition) + delete(f.currentRetries[msg.Topic], msg.Partition) + } + + continue + } + + partitionSet := msgSets[msg.Topic] + if partitionSet == nil { + partitionSet = make(map[int32][]*ProducerMessage) + msgSets[msg.Topic] = partitionSet + } + + partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + } + + return msgSets +} + +func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { + // we iterate through the blocks in the request set, not the response, so that we notice + // if the response is missing a block completely + for topic, partitionSet := range msgSets { + for partition, msgs := range partitionSet { + block := response.GetBlock(topic, partition) + if block == nil { + f.parent.returnErrors(msgs, ErrIncompleteResponse) + continue + } + + switch block.Err { + // Success + case ErrNoError: + for i := range msgs { + msgs[i].Offset = block.Offset + int64(i) } + f.parent.returnSuccesses(msgs) + // Retriable errors + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", + f.broker.ID(), topic, partition, block.Err) + if f.currentRetries[topic] == nil { + f.currentRetries[topic] = make(map[int32]error) + } + f.currentRetries[topic][partition] = block.Err + f.parent.retryMessages(msgs, block.Err) + // Other non-retriable errors + default: + f.parent.returnErrors(msgs, block.Err) } } } - Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } // singleton