From 765b3b4f7d7c590481b7e34f091285bcf13b6d79 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 13 Aug 2015 11:25:21 -0400 Subject: [PATCH] Clean up aggregator via helper methods Also get rid of forceFlushThreshold it does not need its own function --- async_producer.go | 106 ++++++++++++++++++++++++++++------------------ config.go | 8 ++-- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/async_producer.go b/async_producer.go index e4e88ae93..8e229490f 100644 --- a/async_producer.go +++ b/async_producer.go @@ -9,10 +9,6 @@ import ( "github.com/eapache/queue" ) -func forceFlushThreshold() int { - return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely? -} - // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages // to the correct broker for the provided topic-partition, refreshing metadata as appropriate, // and parses responses for errors. You must read from the Errors() channel or the @@ -491,6 +487,7 @@ func (pp *partitionProducer) updateLeader() error { }) } +// one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { input := make(chan *ProducerMessage) bridge := make(chan []*ProducerMessage) @@ -514,7 +511,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag return input } -// one per broker // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel type aggregator struct { @@ -522,20 +518,14 @@ type aggregator struct { broker *Broker input <-chan *ProducerMessage output chan<- []*ProducerMessage + + buffer []*ProducerMessage + bufferBytes int + timer <-chan time.Time } func (a *aggregator) run() { - var ( - timer <-chan time.Time - buffer []*ProducerMessage - flushTriggered chan<- []*ProducerMessage - bytesAccumulated int - defaultFlush bool - ) - - if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 { - defaultFlush = true - } + var output chan<- []*ProducerMessage for { select { @@ -544,45 +534,77 @@ func (a *aggregator) run() { goto shutdown } - if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) || - (a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) || - (a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) { + if a.wouldOverflow(msg) { Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - a.output <- buffer - timer = nil - buffer = nil - flushTriggered = nil - bytesAccumulated = 0 + a.output <- a.buffer + a.reset() + output = nil } - buffer = append(buffer, msg) - bytesAccumulated += msg.byteSize() + a.buffer = append(a.buffer, msg) + a.bufferBytes += msg.byteSize() - if defaultFlush || - msg.flags&chaser == chaser || - (a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) || - (a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) { - flushTriggered = a.output - } else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil { - timer = time.After(a.parent.conf.Producer.Flush.Frequency) + if a.readyToFlush(msg) { + output = a.output + } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { + a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) } - case <-timer: - flushTriggered = a.output - case flushTriggered <- buffer: - timer = nil - buffer = nil - flushTriggered = nil - bytesAccumulated = 0 + case <-a.timer: + output = a.output + case output <- a.buffer: + a.reset() + output = nil } } shutdown: - if len(buffer) > 0 { - a.output <- buffer + if len(a.buffer) > 0 { + a.output <- a.buffer } close(a.output) } +func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { + switch { + // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. + case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + return true + // Would we overflow the size-limit of a compressed message-batch? + case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: + return true + // Would we overflow simply in number of messages? + case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: + return true + default: + return false + } +} + +func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { + switch { + // If all three config values are 0, we always flush as-fast-as-possible + case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0: + return true + // If the messages is a chaser we must flush to maintain the state-machine + case msg.flags&chaser == chaser: + return true + // If we've passed the message trigger-point + case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: + return true + // If we've passed the byte trigger-point + case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: + return true + default: + return false + } +} + +func (a *aggregator) reset() { + a.timer = nil + a.buffer = nil + a.bufferBytes = 0 +} + // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer diff --git a/config.go b/config.go index 4bbb9c738..0fae111e9 100644 --- a/config.go +++ b/config.go @@ -180,11 +180,11 @@ func (c *Config) Validate() error { if c.Producer.RequiredAcks > 1 { Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.") } - if c.Producer.MaxMessageBytes >= forceFlushThreshold() { - Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.") + if c.Producer.MaxMessageBytes >= int(MaxRequestSize) { + Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.") } - if c.Producer.Flush.Bytes >= forceFlushThreshold() { - Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.") + if c.Producer.Flush.Bytes >= int(MaxRequestSize) { + Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.") } if c.Producer.Timeout%time.Millisecond != 0 { Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")