Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up aggregator via helper methods #511

Merged
merged 1 commit into from
Aug 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 64 additions & 42 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
"github.com/eapache/queue"
)

func forceFlushThreshold() int {
return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
}

// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
// and parses responses for errors. You must read from the Errors() channel or the
Expand Down Expand Up @@ -491,6 +487,7 @@ func (pp *partitionProducer) updateLeader() error {
})
}

// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
Expand All @@ -514,28 +511,21 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
return input
}

// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
}

func (a *aggregator) run() {
var (
timer <-chan time.Time
buffer []*ProducerMessage
flushTriggered chan<- []*ProducerMessage
bytesAccumulated int
defaultFlush bool
)

if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
defaultFlush = true
}
var output chan<- []*ProducerMessage

for {
select {
Expand All @@ -544,45 +534,77 @@ func (a *aggregator) run() {
goto shutdown
}

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
if a.wouldOverflow(msg) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- buffer
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
a.output <- a.buffer
a.reset()
output = nil
}

buffer = append(buffer, msg)
bytesAccumulated += msg.byteSize()
a.buffer = append(a.buffer, msg)
a.bufferBytes += msg.byteSize()

if defaultFlush ||
msg.flags&chaser == chaser ||
(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
flushTriggered = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(a.parent.conf.Producer.Flush.Frequency)
if a.readyToFlush(msg) {
output = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
}
case <-timer:
flushTriggered = a.output
case flushTriggered <- buffer:
timer = nil
buffer = nil
flushTriggered = nil
bytesAccumulated = 0
case <-a.timer:
output = a.output
case output <- a.buffer:
a.reset()
output = nil
}
}

shutdown:
if len(buffer) > 0 {
a.output <- buffer
if len(a.buffer) > 0 {
a.output <- a.buffer
}
close(a.output)
}

func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we trust the compiler to optimize the static calculation into a constant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't before and nobody ever complained...

return true
// Would we overflow the size-limit of a compressed message-batch?
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
}
}

func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
switch {
// If all three config values are 0, we always flush as-fast-as-possible
case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
return true
// If the messages is a chaser we must flush to maintain the state-machine
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
return true
default:
return false
}
}

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.bufferBytes = 0
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
Expand Down
8 changes: 4 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ func (c *Config) Validate() error {
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Flush.Bytes >= forceFlushThreshold() {
Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.")
}
if c.Producer.Timeout%time.Millisecond != 0 {
Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
Expand Down