Skip to content

Commit

Permalink
Merge pull request #511 from Shopify/cleanup-aggregator
Browse files Browse the repository at this point in the history
Clean up aggregator via helper methods
  • Loading branch information
eapache committed Aug 13, 2015
2 parents 85c7680 + 765b3b4 commit 6f82d15
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 46 deletions.
106 changes: 64 additions & 42 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/eapache/queue"
)

func forceFlushThreshold() int {
return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
}

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -491,6 +487,7 @@ func (pp *partitionProducer) updateLeader() error {
})
}

// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
Expand All @@ -514,28 +511,21 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
return input
}

// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
}

func (a *aggregator) run() {
var (
timer <-chan time.Time
buffer []*ProducerMessage
flushTriggered chan<- []*ProducerMessage
bytesAccumulated int
defaultFlush bool
)

if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
defaultFlush = true
}
var output chan<- []*ProducerMessage

for {
select {
Expand All @@ -544,45 +534,77 @@ func (a *aggregator) run() {
goto shutdown
}

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
if a.wouldOverflow(msg) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- buffer
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
a.output <- a.buffer
a.reset()
output = nil
}

buffer = append(buffer, msg)
bytesAccumulated += msg.byteSize()
a.buffer = append(a.buffer, msg)
a.bufferBytes += msg.byteSize()

if defaultFlush ||
msg.flags&chaser == chaser ||
(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
flushTriggered = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(a.parent.conf.Producer.Flush.Frequency)
if a.readyToFlush(msg) {
output = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
}
case <-timer:
flushTriggered = a.output
case flushTriggered <- buffer:
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
case <-a.timer:
output = a.output
case output <- a.buffer:
a.reset()
output = nil
}
}

shutdown:
if len(buffer) > 0 {
a.output <- buffer
if len(a.buffer) > 0 {
a.output <- a.buffer
}
close(a.output)
}

func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a compressed message-batch?
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}

func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
switch {
// If all three config values are 0, we always flush as-fast-as-possible
case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
return true
// If the messages is a chaser we must flush to maintain the state-machine
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
return true
default:
return false
}
}

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.bufferBytes = 0
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (c *Config) Validate() error {
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Flush.Bytes >= forceFlushThreshold() {
Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Timeout%time.Millisecond != 0 {
Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
Expand Down

0 comments on commit 6f82d15

Please sign in to comment.