Skip to content

Commit

Permalink
Refactor the producer, part 2
Browse files Browse the repository at this point in the history
Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the
`produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and
`readyToFlush` methods to methods on the `produceSet`.

Knock-on effects:
 - now that we do per-partition size tracking in the aggregator we can do much
   more precise overflow checking (see the compressed-message-batch-size-limit
   case in `wouldOverflow` which has changed) which will be more efficient in
   high-volume scenarios
 - since the produceSet encodes immediately, messages which fail to encode are
   now rejected from the aggregator and don't count towards batch size
 - we still have to iterate the messages in the flusher in order to reject those
   which need retrying due to the state machine; for simplicity I add them to a
   second produceSet still, which means all messages get encoded twice; this is
   a definite major performance regression which will go away again in part 3
   of this refactor
  • Loading branch information
eapache committed Oct 14, 2015
1 parent f0e1e8a commit 733c74f
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 70 deletions.
146 changes: 78 additions & 68 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,14 @@ func (pp *partitionProducer) updateLeader() error {
// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
bridge := make(chan *produceSet)

a := &aggregator{
parent: p,
broker: broker,
input: input,
output: bridge,
buffer: newProduceSet(p),
}
go withRecover(a.run)

Expand All @@ -535,15 +536,14 @@ type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage
output chan<- *produceSet

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
buffer *produceSet
timer <-chan time.Time
}

func (a *aggregator) run() {
var output chan<- []*ProducerMessage
var output chan<- *produceSet

for {
select {
Expand All @@ -552,17 +552,19 @@ func (a *aggregator) run() {
goto shutdown
}

if a.wouldOverflow(msg) {
if a.buffer.wouldOverflow(msg) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- a.buffer
a.reset()
output = nil
}

a.buffer = append(a.buffer, msg)
a.bufferBytes += msg.byteSize()
if err := a.buffer.add(msg); err != nil {
a.parent.returnError(msg, err)
continue
}

if a.readyToFlush(msg) {
if a.buffer.readyToFlush(msg) {
output = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
Expand All @@ -576,58 +578,22 @@ func (a *aggregator) run() {
}

shutdown:
if len(a.buffer) > 0 {
if !a.buffer.empty() {
a.output <- a.buffer
}
close(a.output)
}

func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch?
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}

func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
switch {
// If all three config values are 0, we always flush as-fast-as-possible
case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
return true
// If the messages is a chaser we must flush to maintain the state-machine
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
return true
default:
return false
}
}

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.bufferBytes = 0
a.buffer = newProduceSet(a.parent)
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage
input <-chan *produceSet

currentRetries map[string]map[int32]error
}
Expand All @@ -639,11 +605,13 @@ func (f *flusher) run() {

for batch := range f.input {
if closing != nil {
f.parent.retryMessages(batch, closing)
batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
f.parent.retryMessages(msgs, closing)
})
continue
}

set := f.groupAndFilter(batch)
set := f.filter(batch)
if set.empty() {
continue
}
Expand Down Expand Up @@ -683,30 +651,32 @@ func (f *flusher) run() {
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet {
func (f *flusher) filter(batch *produceSet) *produceSet {
set := newProduceSet(f.parent)

for _, msg := range batch {
batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
for _, msg := range msgs {

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
delete(f.currentRetries[msg.Topic], msg.Partition)
}
if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
delete(f.currentRetries[msg.Topic], msg.Partition)
}

continue
}
continue
}

if err := set.add(msg); err != nil {
f.parent.returnError(msg, err)
continue
if err := set.add(msg); err != nil {
f.parent.returnError(msg, err)
continue
}
}
}
})

return set
}
Expand Down Expand Up @@ -874,6 +844,46 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs
}
}

func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch for this partition?
case ps.parent.conf.Producer.Compression != CompressionNone &&
ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}

func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool {
switch {
// If we don't have any messages, nothing else matters
case ps.empty():
return false
// If all three config values are 0, we always flush as-fast-as-possible
case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
return true
// If the messages is ps chaser we must flush to maintain the state-machine
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
return true
default:
return false
}
}

func (ps *produceSet) empty() bool {
return ps.bufferCount == 0
}
Expand Down
4 changes: 2 additions & 2 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
leader.Returns(prodSuccess)

config := NewConfig()
config.Producer.Flush.Messages = 3
config.Producer.Flush.Messages = 1
config.Producer.Return.Successes = true
config.Producer.Partitioner = NewManualPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
Expand All @@ -330,8 +330,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {

for flush := 0; flush < 3; flush++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
expectResults(t, producer, 1, 2)
}

Expand Down

0 comments on commit 733c74f

Please sign in to comment.