diff --git a/async_producer.go b/async_producer.go index 8e229490f..7cab8db78 100644 --- a/async_producer.go +++ b/async_producer.go @@ -490,13 +490,14 @@ func (pp *partitionProducer) updateLeader() error { // one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) + bridge := make(chan messageBuffer) a := &aggregator{ parent: p, broker: broker, input: input, output: bridge, + buffer: make(messageBuffer), } go withRecover(a.run) @@ -511,21 +512,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag return input } +type messageBuffer map[string]map[int32][]*ProducerMessage + +func (mb messageBuffer) each(action func([]*ProducerMessage, error), err error) { + for _, partitions := range mb { + for _, messages := range partitions { + action(messages, err) + } + } +} + // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel type aggregator struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage - output chan<- []*ProducerMessage + output chan<- messageBuffer - buffer []*ProducerMessage - bufferBytes int - timer <-chan time.Time + buffer messageBuffer + bufferCount, bufferBytes int + timer <-chan time.Time } func (a *aggregator) run() { - var output chan<- []*ProducerMessage + var output chan<- messageBuffer for { select { @@ -541,7 +552,11 @@ func (a *aggregator) run() { output = nil } - a.buffer = append(a.buffer, msg) + if a.buffer[msg.Topic] == nil { + a.buffer[msg.Topic] = make(map[int32][]*ProducerMessage) + } + a.buffer[msg.Topic][msg.Partition] = append(a.buffer[msg.Topic][msg.Partition], msg) + a.bufferCount += 1 a.bufferBytes += msg.byteSize() if a.readyToFlush(msg) { @@ -573,7 +588,7 @@ func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: + case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages: return true default: return false @@ -589,7 +604,7 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { case msg.flags&chaser == chaser: return true // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: + case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages: return true // If we've passed the byte trigger-point case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: @@ -601,15 +616,16 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { func (a *aggregator) reset() { a.timer = nil - a.buffer = nil + a.buffer = make(messageBuffer) a.bufferBytes = 0 + a.bufferCount = 0 } // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer broker *Broker - input <-chan []*ProducerMessage + input <-chan messageBuffer currentRetries map[string]map[int32]error } @@ -621,12 +637,12 @@ func (f *flusher) run() { for batch := range f.input { if closing != nil { - f.parent.retryMessages(batch, closing) + batch.each(f.parent.retryMessages, closing) continue } - msgSets := f.groupAndFilter(batch) - request := f.parent.buildRequest(msgSets) + f.filter(batch) + request := f.buildRequest(batch) if request == nil { continue } @@ -637,58 +653,116 @@ func (f *flusher) run() { case nil: break case PacketEncodingError: - f.parent.returnErrors(batch, err) + batch.each(f.parent.returnErrors, err) continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) f.parent.abandonBrokerConnection(f.broker) _ = f.broker.Close() closing = err - f.parent.retryMessages(batch, err) + batch.each(f.parent.retryMessages, err) continue } if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - f.parent.returnSuccesses(batch) + batch.each(f.parent.returnSuccesses, nil) continue } - f.parseResponse(msgSets, response) + f.parseResponse(batch, response) } Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { - msgSets := make(map[string]map[int32][]*ProducerMessage) +func (f *flusher) filter(batch messageBuffer) { + for topic, partitions := range batch { + for partition, messages := range partitions { + for i, msg := range messages { + if f.currentRetries[topic] != nil && f.currentRetries[topic][partition] != nil { + // we're currently retrying this partition so we need to filter out this message + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[topic][partition]) + messages[i] = nil + + if msg.flags&chaser == chaser { + // ...but now we can start processing future messages again + Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", + f.broker.ID(), topic, partition) + delete(f.currentRetries[topic], partition) + } + } + } + } + } +} - for i, msg := range batch { +func (f *flusher) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil + req := &ProduceRequest{ + RequiredAcks: f.parent.conf.Producer.RequiredAcks, + Timeout: int32(f.parent.conf.Producer.Timeout / time.Millisecond), + } + empty := true - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } + for topic, partitionSet := range batch { + for partition, msgSet := range partitionSet { + setToSend := new(MessageSet) + setSize := 0 + for _, msg := range msgSet { + if msg == nil { + continue + } - continue - } + var keyBytes, valBytes []byte + var err error + if msg.Key != nil { + if keyBytes, err = msg.Key.Encode(); err != nil { + f.parent.returnError(msg, err) + continue + } + } + if msg.Value != nil { + if valBytes, err = msg.Value.Encode(); err != nil { + f.parent.returnError(msg, err) + continue + } + } - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet - } + if f.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > f.parent.conf.Producer.MaxMessageBytes { + // compression causes message-sets to be wrapped as single messages, which have tighter + // size requirements, so we have to respect those limits + valBytes, err := encode(setToSend) + if err != nil { + Logger.Println(err) // if this happens, it's basically our fault. + panic(err) + } + req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) + setToSend = new(MessageSet) + setSize = 0 + } + setSize += msg.byteSize() - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) + empty = false + } + + if f.parent.conf.Producer.Compression == CompressionNone { + req.AddSet(topic, partition, setToSend) + } else { + valBytes, err := encode(setToSend) + if err != nil { + Logger.Println(err) // if this happens, it's basically our fault. + panic(err) + } + req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) + } + } } - return msgSets + if empty { + return nil + } + return req } func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { @@ -708,7 +782,7 @@ func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, for i := range msgs { msgs[i].Offset = block.Offset + int64(i) } - f.parent.returnSuccesses(msgs) + f.parent.returnSuccesses(msgs, nil) // Retriable errors case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: @@ -776,68 +850,6 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - - req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)} - empty := true - - for topic, partitionSet := range batch { - for partition, msgSet := range partitionSet { - setToSend := new(MessageSet) - setSize := 0 - for _, msg := range msgSet { - var keyBytes, valBytes []byte - var err error - if msg.Key != nil { - if keyBytes, err = msg.Key.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if msg.Value != nil { - if valBytes, err = msg.Value.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { - // compression causes message-sets to be wrapped as single messages, which have tighter - // size requirements, so we have to respect those limits - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - setToSend = new(MessageSet) - setSize = 0 - } - setSize += msg.byteSize() - - setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) - empty = false - } - - if p.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, setToSend) - } else { - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - } - } - } - - if empty { - return nil - } - return req -} - func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { msg.clear() pErr := &ProducerError{Msg: msg, Err: err} @@ -857,7 +869,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { } } -func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { +func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage, unused error) { for _, msg := range batch { if msg == nil { continue