Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry messages on shutdown #420

Merged
merged 2 commits into from
Apr 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 48 additions & 63 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type asyncProducer struct {

errors chan *ProducerError
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
Expand Down Expand Up @@ -105,8 +106,6 @@ type flagSet int8

const (
chaser flagSet = 1 << iota // message is last in a group that failed
ref // add a reference to a singleton channel
unref // remove a reference from a singleton channel
shutdown // start the shutdown process
)

Expand Down Expand Up @@ -193,9 +192,7 @@ func (p *asyncProducer) Close() error {
}

func (p *asyncProducer) AsyncClose() {
go withRecover(func() {
p.input <- &ProducerMessage{flags: shutdown}
})
go withRecover(p.shutdown)
}

///////////////////////////////////////////
Expand All @@ -209,6 +206,7 @@ func (p *asyncProducer) AsyncClose() {
// dispatches messages by topic
func (p *asyncProducer) topicDispatcher() {
handlers := make(map[string]chan *ProducerMessage)
shuttingDown := false

for msg := range p.input {
if msg == nil {
Expand All @@ -217,8 +215,14 @@ func (p *asyncProducer) topicDispatcher() {
}

if msg.flags&shutdown != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will only be one message with the shutdown message set, and it doesn't need to propagate further anymore, right?

Instead of sending a message with a flag, could we simply use a closing chan none for this now? Or am I missing something? I'd rather minimize the number of "special" messages that go through the pipeline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will only be one message with the shutdown message set, and it doesn't need to propagate further anymore, right?

I guess in theory there could be several if the user calls AsyncClose several times, but I don't feel like guarding against that case.

Instead of sending a message with a flag, could we simply use a closing chan none for this now? Or am I missing something? I'd rather minimize the number of "special" messages that go through the pipeline

It would mean this goroutine would need a select instead of a nice simple range... not sure if that really ends up being any cleaner, I'll mock it quickly to see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would mean this goroutine would need a select instead of a nice simple range... not sure if that really ends up being any cleaner, I'll mock it quickly to see.

Really no simpler, because once the closing chan is closed then we have to go to a different loop somewhere or we'll spin forever because a closed channel is always ready for reading (and we still have to process things coming in on the input channel for a while longer).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working through that did give me an idea for an additional minor simplification though (see the third commit).

Logger.Println("Producer shutting down.")
break
shuttingDown = true
continue
} else if msg.retries == 0 {
p.inFlight.Add(1)
if shuttingDown {
p.returnError(msg, ErrShuttingDown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reduces the inflight counter, but it was never incremented for this message. We should probably move the p.inFlight.Add(1) up so it always gets executed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, really nice catch. I will fix and see if I can add or adjust a test for this case too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this path is fixed and tested now.

continue
}
}

if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
Expand All @@ -230,7 +234,6 @@ func (p *asyncProducer) topicDispatcher() {

handler := handlers[msg.Topic]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
Expand All @@ -244,21 +247,6 @@ func (p *asyncProducer) topicDispatcher() {
for _, handler := range handlers {
close(handler)
}

p.retries <- &ProducerMessage{flags: shutdown}

for msg := range p.input {
p.returnError(msg, ErrShuttingDown)
}

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}
close(p.errors)
close(p.successes)
}

// one per topic
Expand All @@ -281,7 +269,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe

handler := handlers[msg.Partition]
if handler == nil {
p.retries <- &ProducerMessage{flags: ref}
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
topic := msg.Topic // block local because go's closure semantics suck
partition := msg.Partition // block local because go's closure semantics suck
Expand All @@ -296,7 +283,6 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
for _, handler := range handlers {
close(handler)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per partition per topic
Expand Down Expand Up @@ -344,6 +330,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
highWatermark = msg.retries
Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
retryState[msg.retries].expectChaser = true
p.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID())
p.unrefBrokerProducer(leader, output)
Expand All @@ -355,6 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
if msg.flags&chaser == chaser {
retryState[msg.retries].expectChaser = false
p.inFlight.Done() // this chaser is now handled and will be garbage collected
} else {
retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
}
Expand Down Expand Up @@ -392,6 +380,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
}

}
p.inFlight.Done() // this chaser is now handled and will be garbage collected
continue
}
}
Expand All @@ -414,7 +403,6 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
if output != nil {
p.unrefBrokerProducer(leader, output)
}
p.retries <- &ProducerMessage{flags: unref}
}

// one per broker
Expand Down Expand Up @@ -543,9 +531,7 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
if p.conf.Producer.Return.Successes {
p.returnSuccesses(batch)
}
p.returnSuccesses(batch)
continue
}

Expand All @@ -563,12 +549,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
switch block.Err {
case ErrNoError:
// All the messages for this topic-partition were delivered successfully!
if p.conf.Producer.Return.Successes {
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
Expand All @@ -585,19 +569,14 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
}
}
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
p.retries <- &ProducerMessage{flags: unref}
}

// singleton
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
func (p *asyncProducer) retryHandler() {
var (
msg *ProducerMessage
buf = queue.New()
refs = 0
shuttingDown = false
)
var msg *ProducerMessage
buf := queue.New()

for {
if buf.Length() == 0 {
Expand All @@ -611,36 +590,38 @@ func (p *asyncProducer) retryHandler() {
}
}

if msg.flags&ref != 0 {
refs++
} else if msg.flags&unref != 0 {
refs--
if refs == 0 && shuttingDown {
break
}
} else if msg.flags&shutdown != 0 {
shuttingDown = true
if refs == 0 {
break
}
} else {
buf.Add(msg)
if msg == nil {
return
}
}

close(p.retries)
for buf.Length() != 0 {
p.input <- buf.Peek().(*ProducerMessage)
buf.Remove()
buf.Add(msg)
}
close(p.input)
}

///////////////////////////////////////////
///////////////////////////////////////////

// utility functions

func (p *asyncProducer) shutdown() {
Logger.Println("Producer shutting down.")
p.input <- &ProducerMessage{flags: shutdown}

p.inFlight.Wait()

if p.ownClient {
err := p.client.Close()
if err != nil {
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}

close(p.input)
close(p.retries)
close(p.errors)
close(p.successes)
}

func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
var partitions []int32
var err error
Expand Down Expand Up @@ -745,6 +726,7 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
} else {
Logger.Println(pErr)
}
p.inFlight.Done()
}

func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
Expand All @@ -757,10 +739,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if msg != nil {
if msg == nil {
continue
}
if p.conf.Producer.Return.Successes {
msg.flags = 0
p.successes <- msg
}
p.inFlight.Done()
}
}

Expand All @@ -785,7 +771,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
bp := p.brokers[broker]

if bp == nil {
p.retries <- &ProducerMessage{flags: ref}
bp = make(chan *ProducerMessage)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
Expand Down
50 changes: 50 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os/signal"
"sync"
"testing"
"time"
)

const TestMessage = "ABC THE MESSAGE"
Expand Down Expand Up @@ -525,6 +526,55 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
leader.Close()
}

func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)

config := NewConfig()
config.Producer.Flush.Messages = 10
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}
producer.AsyncClose()
time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in

producer.Input() <- &ProducerMessage{Topic: "FOO"}
if err := <-producer.Errors(); err.Err != ErrShuttingDown {
t.Error(err)
}

prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader.Returns(prodNotLeader)

seedBroker.Returns(metadataLeader)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
expectSuccesses(t, producer, 10)

seedBroker.Close()
leader.Close()

// wait for the async-closed producer to shut down fully
for err := range producer.Errors() {
t.Error(err)
}
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down