Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle messages less in the producer. #513

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 116 additions & 104 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,14 @@ func (pp *partitionProducer) updateLeader() error {
// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
bridge := make(chan messageBuffer)

a := &aggregator{
parent: p,
broker: broker,
input: input,
output: bridge,
buffer: make(messageBuffer),
}
go withRecover(a.run)

Expand All @@ -511,21 +512,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
return input
}

type messageBuffer map[string]map[int32][]*ProducerMessage

func (mb messageBuffer) each(action func([]*ProducerMessage, error), err error) {
for _, partitions := range mb {
for _, messages := range partitions {
action(messages, err)
}
}
}

// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage
output chan<- messageBuffer

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
buffer messageBuffer
bufferCount, bufferBytes int
timer <-chan time.Time
}

func (a *aggregator) run() {
var output chan<- []*ProducerMessage
var output chan<- messageBuffer

for {
select {
Expand All @@ -541,7 +552,11 @@ func (a *aggregator) run() {
output = nil
}

a.buffer = append(a.buffer, msg)
if a.buffer[msg.Topic] == nil {
a.buffer[msg.Topic] = make(map[int32][]*ProducerMessage)
}
a.buffer[msg.Topic][msg.Partition] = append(a.buffer[msg.Topic][msg.Partition], msg)
a.bufferCount += 1
a.bufferBytes += msg.byteSize()

if a.readyToFlush(msg) {
Expand Down Expand Up @@ -573,7 +588,7 @@ func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
Expand All @@ -589,7 +604,7 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
Expand All @@ -601,15 +616,16 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.buffer = make(messageBuffer)
a.bufferBytes = 0
a.bufferCount = 0
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage
input <-chan messageBuffer

currentRetries map[string]map[int32]error
}
Expand All @@ -621,12 +637,12 @@ func (f *flusher) run() {

for batch := range f.input {
if closing != nil {
f.parent.retryMessages(batch, closing)
batch.each(f.parent.retryMessages, closing)
continue
}

msgSets := f.groupAndFilter(batch)
request := f.parent.buildRequest(msgSets)
f.filter(batch)
request := f.buildRequest(batch)
if request == nil {
continue
}
Expand All @@ -637,58 +653,116 @@ func (f *flusher) run() {
case nil:
break
case PacketEncodingError:
f.parent.returnErrors(batch, err)
batch.each(f.parent.returnErrors, err)
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
f.parent.abandonBrokerConnection(f.broker)
_ = f.broker.Close()
closing = err
f.parent.retryMessages(batch, err)
batch.each(f.parent.retryMessages, err)
continue
}

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
f.parent.returnSuccesses(batch)
batch.each(f.parent.returnSuccesses, nil)
continue
}

f.parseResponse(msgSets, response)
f.parseResponse(batch, response)
}
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
msgSets := make(map[string]map[int32][]*ProducerMessage)
func (f *flusher) filter(batch messageBuffer) {
for topic, partitions := range batch {
for partition, messages := range partitions {
for i, msg := range messages {
if f.currentRetries[topic] != nil && f.currentRetries[topic][partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[topic][partition])
messages[i] = nil

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), topic, partition)
delete(f.currentRetries[topic], partition)
}
}
}
}
}
}

for i, msg := range batch {
func (f *flusher) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
batch[i] = nil
req := &ProduceRequest{
RequiredAcks: f.parent.conf.Producer.RequiredAcks,
Timeout: int32(f.parent.conf.Producer.Timeout / time.Millisecond),
}
empty := true

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
delete(f.currentRetries[msg.Topic], msg.Partition)
}
for topic, partitionSet := range batch {
for partition, msgSet := range partitionSet {
setToSend := new(MessageSet)
setSize := 0
for _, msg := range msgSet {
if msg == nil {
continue
}

continue
}
var keyBytes, valBytes []byte
var err error
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
f.parent.returnError(msg, err)
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
f.parent.returnError(msg, err)
continue
}
}

partitionSet := msgSets[msg.Topic]
if partitionSet == nil {
partitionSet = make(map[int32][]*ProducerMessage)
msgSets[msg.Topic] = partitionSet
}
if f.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > f.parent.conf.Producer.MaxMessageBytes {
// compression causes message-sets to be wrapped as single messages, which have tighter
// size requirements, so we have to respect those limits
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes})
setToSend = new(MessageSet)
setSize = 0
}
setSize += msg.byteSize()

partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
empty = false
}

if f.parent.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, setToSend)
} else {
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes})
}
}
}

return msgSets
if empty {
return nil
}
return req
}

func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
Expand All @@ -708,7 +782,7 @@ func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage,
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
f.parent.returnSuccesses(msgs)
f.parent.returnSuccesses(msgs, nil)
// Retriable errors
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Expand Down Expand Up @@ -776,68 +850,6 @@ func (p *asyncProducer) shutdown() {
close(p.successes)
}

func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {

req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
empty := true

for topic, partitionSet := range batch {
for partition, msgSet := range partitionSet {
setToSend := new(MessageSet)
setSize := 0
for _, msg := range msgSet {
var keyBytes, valBytes []byte
var err error
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}

if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
// compression causes message-sets to be wrapped as single messages, which have tighter
// size requirements, so we have to respect those limits
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
setToSend = new(MessageSet)
setSize = 0
}
setSize += msg.byteSize()

setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
empty = false
}

if p.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, setToSend)
} else {
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
}
}
}

if empty {
return nil
}
return req
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
Expand All @@ -857,7 +869,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage, unused error) {
for _, msg := range batch {
if msg == nil {
continue
Expand Down