From 7d70e73f37d0944423ac65f31a9f5506f31e004e Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 13 Aug 2015 15:49:43 -0400 Subject: [PATCH] Shuffle messages less in the producer. Put them in a map right up front in the aggregator, it only requires tracking one exta piece of metadata (total messages in the map) and it means we don't have to shuffle them into this form before constructing the request anyways. One piece of #433. --- async_producer.go | 220 ++++++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 104 deletions(-) diff --git a/async_producer.go b/async_producer.go index 8e229490f..7cab8db78 100644 --- a/async_producer.go +++ b/async_producer.go @@ -490,13 +490,14 @@ func (pp *partitionProducer) updateLeader() error { // one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) + bridge := make(chan messageBuffer) a := &aggregator{ parent: p, broker: broker, input: input, output: bridge, + buffer: make(messageBuffer), } go withRecover(a.run) @@ -511,21 +512,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag return input } +type messageBuffer map[string]map[int32][]*ProducerMessage + +func (mb messageBuffer) each(action func([]*ProducerMessage, error), err error) { + for _, partitions := range mb { + for _, messages := range partitions { + action(messages, err) + } + } +} + // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel type aggregator struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage - output chan<- []*ProducerMessage + output chan<- messageBuffer - buffer []*ProducerMessage - bufferBytes int - timer <-chan time.Time + buffer messageBuffer + bufferCount, bufferBytes int + timer <-chan time.Time } func (a *aggregator) run() { - var output chan<- []*ProducerMessage + var output chan<- messageBuffer for { select { @@ -541,7 +552,11 @@ func (a *aggregator) run() { output = nil } - a.buffer = append(a.buffer, msg) + if a.buffer[msg.Topic] == nil { + a.buffer[msg.Topic] = make(map[int32][]*ProducerMessage) + } + a.buffer[msg.Topic][msg.Partition] = append(a.buffer[msg.Topic][msg.Partition], msg) + a.bufferCount += 1 a.bufferBytes += msg.byteSize() if a.readyToFlush(msg) { @@ -573,7 +588,7 @@ func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: + case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages: return true default: return false @@ -589,7 +604,7 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { case msg.flags&chaser == chaser: return true // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: + case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages: return true // If we've passed the byte trigger-point case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: @@ -601,15 +616,16 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { func (a *aggregator) reset() { a.timer = nil - a.buffer = nil + a.buffer = make(messageBuffer) a.bufferBytes = 0 + a.bufferCount = 0 } // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer broker *Broker - input <-chan []*ProducerMessage + input <-chan messageBuffer currentRetries map[string]map[int32]error } @@ -621,12 +637,12 @@ func (f *flusher) run() { for batch := range f.input { if closing != nil { - f.parent.retryMessages(batch, closing) + batch.each(f.parent.retryMessages, closing) continue } - msgSets := f.groupAndFilter(batch) - request := f.parent.buildRequest(msgSets) + f.filter(batch) + request := f.buildRequest(batch) if request == nil { continue } @@ -637,58 +653,116 @@ func (f *flusher) run() { case nil: break case PacketEncodingError: - f.parent.returnErrors(batch, err) + batch.each(f.parent.returnErrors, err) continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) f.parent.abandonBrokerConnection(f.broker) _ = f.broker.Close() closing = err - f.parent.retryMessages(batch, err) + batch.each(f.parent.retryMessages, err) continue } if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - f.parent.returnSuccesses(batch) + batch.each(f.parent.returnSuccesses, nil) continue } - f.parseResponse(msgSets, response) + f.parseResponse(batch, response) } Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { - msgSets := make(map[string]map[int32][]*ProducerMessage) +func (f *flusher) filter(batch messageBuffer) { + for topic, partitions := range batch { + for partition, messages := range partitions { + for i, msg := range messages { + if f.currentRetries[topic] != nil && f.currentRetries[topic][partition] != nil { + // we're currently retrying this partition so we need to filter out this message + f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[topic][partition]) + messages[i] = nil + + if msg.flags&chaser == chaser { + // ...but now we can start processing future messages again + Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", + f.broker.ID(), topic, partition) + delete(f.currentRetries[topic], partition) + } + } + } + } + } +} - for i, msg := range batch { +func (f *flusher) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil + req := &ProduceRequest{ + RequiredAcks: f.parent.conf.Producer.RequiredAcks, + Timeout: int32(f.parent.conf.Producer.Timeout / time.Millisecond), + } + empty := true - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } + for topic, partitionSet := range batch { + for partition, msgSet := range partitionSet { + setToSend := new(MessageSet) + setSize := 0 + for _, msg := range msgSet { + if msg == nil { + continue + } - continue - } + var keyBytes, valBytes []byte + var err error + if msg.Key != nil { + if keyBytes, err = msg.Key.Encode(); err != nil { + f.parent.returnError(msg, err) + continue + } + } + if msg.Value != nil { + if valBytes, err = msg.Value.Encode(); err != nil { + f.parent.returnError(msg, err) + continue + } + } - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet - } + if f.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > f.parent.conf.Producer.MaxMessageBytes { + // compression causes message-sets to be wrapped as single messages, which have tighter + // size requirements, so we have to respect those limits + valBytes, err := encode(setToSend) + if err != nil { + Logger.Println(err) // if this happens, it's basically our fault. + panic(err) + } + req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) + setToSend = new(MessageSet) + setSize = 0 + } + setSize += msg.byteSize() - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) + empty = false + } + + if f.parent.conf.Producer.Compression == CompressionNone { + req.AddSet(topic, partition, setToSend) + } else { + valBytes, err := encode(setToSend) + if err != nil { + Logger.Println(err) // if this happens, it's basically our fault. + panic(err) + } + req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) + } + } } - return msgSets + if empty { + return nil + } + return req } func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { @@ -708,7 +782,7 @@ func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, for i := range msgs { msgs[i].Offset = block.Offset + int64(i) } - f.parent.returnSuccesses(msgs) + f.parent.returnSuccesses(msgs, nil) // Retriable errors case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: @@ -776,68 +850,6 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - - req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)} - empty := true - - for topic, partitionSet := range batch { - for partition, msgSet := range partitionSet { - setToSend := new(MessageSet) - setSize := 0 - for _, msg := range msgSet { - var keyBytes, valBytes []byte - var err error - if msg.Key != nil { - if keyBytes, err = msg.Key.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if msg.Value != nil { - if valBytes, err = msg.Value.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { - // compression causes message-sets to be wrapped as single messages, which have tighter - // size requirements, so we have to respect those limits - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - setToSend = new(MessageSet) - setSize = 0 - } - setSize += msg.byteSize() - - setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) - empty = false - } - - if p.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, setToSend) - } else { - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - } - } - } - - if empty { - return nil - } - return req -} - func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { msg.clear() pErr := &ProducerError{Msg: msg, Err: err} @@ -857,7 +869,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { } } -func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { +func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage, unused error) { for _, msg := range batch { if msg == nil { continue