Skip to content

Commit

Permalink
Retry messages on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache committed Apr 24, 2015
1 parent 3534171 commit 00db3e7
Showing 1 changed file with 43 additions and 59 deletions.
102 changes: 43 additions & 59 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type asyncProducer struct {

errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
Expand Down Expand Up @@ -105,8 +106,6 @@ type flagSet int8

const (
chaser flagSet = 1 << iota // message is last in a group that failed
ref // add a reference to a singleton channel
unref // remove a reference from a singleton channel
shutdown // start the shutdown process
)

Expand Down Expand Up @@ -209,6 +208,7 @@ func (p *asyncProducer) AsyncClose() {
// dispatches messages by topic
func (p *asyncProducer) topicDispatcher() {
handlers := make(map[string]chan *ProducerMessage)
shuttingDown := false

for msg := range p.input {
if msg == nil {
Expand All @@ -218,7 +218,16 @@ func (p *asyncProducer) topicDispatcher() {

if msg.flags&shutdown != 0 {
Logger.Println("Producer shutting down.")
break
shuttingDown = true
go withRecover(p.shutdown)
continue
} else if msg.retries == 0 {
if shuttingDown {
p.returnError(msg, ErrShuttingDown)
continue
} else {
p.inFlight.Add(1)
}
}

if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
Expand All @@ -230,7 +239,6 @@ func (p *asyncProducer) topicDispatcher() {

handler := handlers[msg.Topic]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
Expand All @@ -244,21 +252,6 @@ func (p *asyncProducer) topicDispatcher() {
for _, handler := range handlers {
close(handler)
}

p.retries <- &ProducerMessage{flags: shutdown}

for msg := range p.input {
p.returnError(msg, ErrShuttingDown)
}

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}
close(p.errors)
close(p.successes)
}

// one per topic
Expand All @@ -281,7 +274,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe

handler := handlers[msg.Partition]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
partition := msg.Partition // block local because go's closure semantics suck
Expand All @@ -296,7 +288,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
for _, handler := range handlers {
close(handler)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per partition per topic
Expand Down Expand Up @@ -414,7 +405,6 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
if output != nil {
p.unrefBrokerProducer(leader, output)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per broker
Expand Down Expand Up @@ -543,9 +533,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
if p.conf.Producer.Return.Successes {
p.returnSuccesses(batch)
}
p.returnSuccesses(batch)
continue
}

Expand All @@ -563,12 +551,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
switch block.Err {
case ErrNoError:
// All the messages for this topic-partition were delivered successfully!
if p.conf.Producer.Return.Successes {
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
Expand All @@ -585,19 +571,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
}
}
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
p.retries <- &ProducerMessage{flags: unref}
}

// singleton
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
var (
msg *ProducerMessage
buf = queue.New()
refs = 0
shuttingDown = false
)
var msg *ProducerMessage
buf := queue.New()

for {
if buf.Length() == 0 {
Expand All @@ -611,36 +592,35 @@ func (p *asyncProducer) retryHandler() {
}
}

if msg.flags&ref != 0 {
refs++
} else if msg.flags&unref != 0 {
refs--
if refs == 0 && shuttingDown {
break
}
} else if msg.flags&shutdown != 0 {
shuttingDown = true
if refs == 0 {
break
}
} else {
buf.Add(msg)
if msg == nil {
return
}
}

close(p.retries)
for buf.Length() != 0 {
p.input <- buf.Peek().(*ProducerMessage)
buf.Remove()
buf.Add(msg)
}
close(p.input)
}

///////////////////////////////////////////
///////////////////////////////////////////

// utility functions

func (p *asyncProducer) shutdown() {
p.inFlight.Wait()

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}

close(p.errors)
close(p.successes)
close(p.input)
close(p.retries)
}

func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
var partitions []int32
var err error
Expand Down Expand Up @@ -745,6 +725,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
} else {
Logger.Println(pErr)
}
p.inFlight.Done()
}

func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
Expand All @@ -757,10 +738,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if msg != nil {
if msg == nil {
continue
}
if p.conf.Producer.Return.Successes {
msg.flags = 0
p.successes <- msg
}
p.inFlight.Done()
}
}

Expand All @@ -785,7 +770,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
bp := p.brokers[broker]

if bp == nil {
p.retries <- &ProducerMessage{flags: ref}
bp = make(chan *ProducerMessage)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
Expand Down

0 comments on commit 00db3e7

Please sign in to comment.