Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: bugfix for broker workers getting stuck #369

Merged
merged 1 commit into from
Mar 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367)).
- Fix the consumer's internal reference counting in certain unusual scenarios
([#369](https://github.com/Shopify/sarama/pull/369)).
- Fix a condition where the producer's internal control messages could have
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).

Expand Down
43 changes: 26 additions & 17 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
return nil, err
}

if leader, err := c.client.Leader(child.topic, child.partition); err != nil {
var leader *Broker
var err error
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
return nil, err
} else {
child.broker = leader
}

if err := c.addChild(child); err != nil {
Expand All @@ -127,8 +127,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)

go withRecover(child.dispatcher)

brokerWorker := c.refBrokerConsumer(child.broker)
brokerWorker.input <- child
child.broker = c.refBrokerConsumer(leader)
child.broker.input <- child

return child, nil
}
Expand Down Expand Up @@ -171,31 +171,39 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
refs: 1,
refs: 0,
}
go withRecover(brokerWorker.subscriptionManager)
go withRecover(brokerWorker.subscriptionConsumer)
c.brokerConsumers[broker] = brokerWorker
} else {
brokerWorker.refs++
}

brokerWorker.refs++

return brokerWorker
}

func (c *consumer) unrefBrokerConsumer(broker *Broker) {
func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
c.lock.Lock()
defer c.lock.Unlock()

brokerWorker := c.brokerConsumers[broker]
brokerWorker.refs--

if brokerWorker.refs == 0 {
close(brokerWorker.input)
delete(c.brokerConsumers, broker)
if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
delete(c.brokerConsumers, brokerWorker.broker)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand it correctly, this if case only triggers during a normal consumer shutdown? When we shut down because of a broker error, this delete will already have happened?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It triggers when all PartitionConsumers move off of the brokerConsumer "safely", either via being closed or via a NotLeaderForPartition response. When the broker blows up we abort, and the delete happens immediately to prevent new partitions from taking a reference.

}
}

func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
c.lock.Lock()
defer c.lock.Unlock()

delete(c.brokerConsumers, brokerWorker.broker)
}

// PartitionConsumer

// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
Expand Down Expand Up @@ -237,7 +245,7 @@ type partitionConsumer struct {
topic string
partition int32

broker *Broker
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
trigger, dying chan none
Expand Down Expand Up @@ -291,15 +299,15 @@ func (child *partitionConsumer) dispatch() error {
return err
}

if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil {
var leader *Broker
var err error
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
return err
} else {
child.broker = leader
}

brokerWorker := child.consumer.refBrokerConsumer(child.broker)
child.broker = child.consumer.refBrokerConsumer(leader)

brokerWorker.input <- child
child.broker.input <- child

return nil
}
Expand Down Expand Up @@ -463,6 +471,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
}

func (w *brokerConsumer) abort(err error) {
w.consumer.abandonBrokerConsumer(w)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another clarification: because we only have one goroutine dealing with this broker, there is no chance of us using the broker's connection between the occurrence of the error, and this call where we abandon it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the subscriptionManager is a separate goroutine, so there are two, but effectively yes.

_ = w.broker.Close() // we don't care about the error this might return, we already have one

for child := range w.subscriptions {
Expand Down
89 changes: 89 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,95 @@ func TestConsumerInterleavedClose(t *testing.T) {
seedBroker.Close()
}

func TestConsumerBounceWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Retry.Backoff = 0
config.ChannelBufferSize = 0
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

c0, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
t.Fatal(err)
}

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse.AddError("my_topic", 1, ErrNoError)
leader.Returns(fetchResponse)
<-c0.Messages()

fetchResponse = new(FetchResponse)
fetchResponse.AddError("my_topic", 0, ErrNoError)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
<-c1.Messages()

leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)

// unblock one of the two (it doesn't matter which)
select {
case <-c0.Errors():
case <-c1.Errors():
}
// send it back to the same broker
seedBroker.Returns(metadataResponse)

fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
leader.Returns(fetchResponse)

time.Sleep(5 * time.Millisecond)

// unblock the other one
select {
case <-c0.Errors():
case <-c1.Errors():
}
// send it back to the same broker
seedBroker.Returns(metadataResponse)

select {
case <-c0.Messages():
case <-c1.Messages():
}

leader.Close()
seedBroker.Close()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
_ = c0.Close()
wg.Done()
}()
go func() {
_ = c1.Close()
wg.Done()
}()
wg.Wait()
safeClose(t, master)
}

// This example has the simplest use case of the consumer. It simply
// iterates over the messages channel using a for/range loop. Because
// a producer never stopsunless requested, a signal handler is registered
Expand Down