Skip to content

Commit

Permalink
Structure the aggregator and flusher goroutines
Browse files Browse the repository at this point in the history
Another chunk of IBM#300. Once this lands the final step will be to break out the
existing massive `run()` methods into useful helper functions now that
parameter-passing isn't such a pain.

In theory no functional changes; just slightly nicer code and stricter typing
around channel direction.
  • Loading branch information
eapache authored and aaronkavlie-wf committed Aug 18, 2015
1 parent fa444e4 commit 4d05c66
Showing 1 changed file with 79 additions and 48 deletions.
127 changes: 79 additions & 48 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type asyncProducer struct {
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokerLock sync.Mutex
}

Expand Down Expand Up @@ -91,8 +91,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan *ProducerMessage),
brokerRefs: make(map[chan *ProducerMessage]int),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -347,7 +347,7 @@ type partitionProducer struct {

leader *Broker
breaker *breaker.Breaker
output chan *ProducerMessage
output chan<- *ProducerMessage

// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
Expand Down Expand Up @@ -491,37 +491,63 @@ func (pp *partitionProducer) updateLeader() error {
})
}

func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)

a := &aggregator{
parent: p,
broker: broker,
input: input,
output: bridge,
}
go withRecover(a.run)

f := &flusher{
parent: p,
broker: broker,
input: bridge,
}
go withRecover(f.run)

return input
}

// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage
}

func (a *aggregator) run() {
var (
timer <-chan time.Time
buffer []*ProducerMessage
flushTriggered chan []*ProducerMessage
flushTriggered chan<- []*ProducerMessage
bytesAccumulated int
defaultFlush bool
)

if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
defaultFlush = true
}

output := make(chan []*ProducerMessage)
go withRecover(func() { p.flusher(broker, output) })

for {
select {
case msg := <-input:
case msg := <-a.input:
if msg == nil {
goto shutdown
}

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
output <- buffer
(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- buffer
timer = nil
buffer = nil
flushTriggered = nil
Expand All @@ -533,14 +559,14 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer

if defaultFlush ||
msg.flags&chaser == chaser ||
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
flushTriggered = output
} else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(p.conf.Producer.Flush.Frequency)
(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
flushTriggered = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(a.parent.conf.Producer.Flush.Frequency)
}
case <-timer:
flushTriggered = output
flushTriggered = a.output
case flushTriggered <- buffer:
timer = nil
buffer = nil
Expand All @@ -551,21 +577,27 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer

shutdown:
if len(buffer) > 0 {
output <- buffer
a.output <- buffer
}
close(output)
close(a.output)
}

// one per broker
// takes a batch at a time from the messageAggregator and sends to the broker
func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage
}

func (f *flusher) run() {
var closing error
currentRetries := make(map[string]map[int32]error)
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())

for batch := range input {
for batch := range f.input {
if closing != nil {
p.retryMessages(batch, closing)
f.parent.retryMessages(batch, closing)
continue
}

Expand All @@ -576,10 +608,10 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
if msg.flags&chaser == chaser {
// we can start processing this topic/partition again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
broker.ID(), msg.Topic, msg.Partition)
f.broker.ID(), msg.Topic, msg.Partition)
currentRetries[msg.Topic][msg.Partition] = nil
}
p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
batch[i] = nil // to prevent it being returned/retried twice
continue
}
Expand All @@ -593,31 +625,31 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
}

request := p.buildRequest(msgSets)
request := f.parent.buildRequest(msgSets)
if request == nil {
continue
}

response, err := broker.Produce(request)
response, err := f.broker.Produce(request)

switch err.(type) {
case nil:
break
case PacketEncodingError:
p.returnErrors(batch, err)
f.parent.returnErrors(batch, err)
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
p.abandonBrokerConnection(broker)
_ = broker.Close()
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
f.parent.abandonBrokerConnection(f.broker)
_ = f.broker.Close()
closing = err
p.retryMessages(batch, err)
f.parent.retryMessages(batch, err)
continue
}

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
p.returnSuccesses(batch)
f.parent.returnSuccesses(batch)
continue
}

Expand All @@ -628,7 +660,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)

block := response.GetBlock(topic, partition)
if block == nil {
p.returnErrors(msgs, ErrIncompleteResponse)
f.parent.returnErrors(msgs, ErrIncompleteResponse)
continue
}

Expand All @@ -638,23 +670,23 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
f.parent.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
broker.ID(), topic, partition, block.Err)
f.broker.ID(), topic, partition, block.Err)
if currentRetries[topic] == nil {
currentRetries[topic] = make(map[int32]error)
}
currentRetries[topic][partition] = block.Err
p.retryMessages(msgs, block.Err)
f.parent.retryMessages(msgs, block.Err)
default:
p.returnErrors(msgs, block.Err)
f.parent.returnErrors(msgs, block.Err)
}
}
}
}
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

// singleton
Expand Down Expand Up @@ -814,25 +846,24 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

bp := p.brokers[broker]

if bp == nil {
bp = make(chan *ProducerMessage)
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
go withRecover(func() { p.messageAggregator(broker, bp) })
}

p.brokerRefs[bp]++

return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

Expand Down

0 comments on commit 4d05c66

Please sign in to comment.