Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process consumed partitions asynchronously #398

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 55 additions & 37 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
trigger: make(chan none, 1),
trigger: make(chan *FetchResponse, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
}
Expand Down Expand Up @@ -162,25 +162,15 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
c.lock.Lock()
defer c.lock.Unlock()

brokerWorker := c.brokerConsumers[broker]
if brokerWorker == nil {
brokerWorker = &brokerConsumer{
consumer: c,
broker: broker,
input: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
refs: 0,
}
go withRecover(brokerWorker.subscriptionManager)
go withRecover(brokerWorker.subscriptionConsumer)
c.brokerConsumers[broker] = brokerWorker
bc := c.brokerConsumers[broker]
if bc == nil {
bc = c.newBrokerConsumer(broker)
c.brokerConsumers[broker] = bc
}

brokerWorker.refs++
bc.refs++

return brokerWorker
return bc
}

func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
Expand Down Expand Up @@ -245,10 +235,11 @@ type partitionConsumer struct {
topic string
partition int32

broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
trigger, dying chan none
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
trigger chan *FetchResponse
dying chan none

fetchSize int32
offset int64
Expand All @@ -269,7 +260,22 @@ func (child *partitionConsumer) sendError(err error) {
}

func (child *partitionConsumer) dispatcher() {
for _ = range child.trigger {
for response := range child.trigger {
if response != nil {
err := child.handleResponse(response)
switch err {
case nil:
child.broker.acks <- nil
continue
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.broker.acks <- child
default:
child.sendError(err)
child.broker.acks <- child
}
}

select {
case <-child.dying:
close(child.trigger)
Expand All @@ -282,7 +288,7 @@ func (child *partitionConsumer) dispatcher() {
Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}
child.trigger <- nil
}
}
}
Expand Down Expand Up @@ -438,13 +444,29 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
input, acks chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
wait chan none
subscriptions map[*partitionConsumer]none
refs int
}

func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
bc := &brokerConsumer{
consumer: c,
broker: broker,
input: make(chan *partitionConsumer),
acks: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
refs: 0,
}
go withRecover(bc.subscriptionManager)
go withRecover(bc.subscriptionConsumer)
return bc
}

func (w *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer

Expand Down Expand Up @@ -508,17 +530,13 @@ func (w *brokerConsumer) subscriptionConsumer() {
}

for child := range w.subscriptions {
if err := child.handleResponse(response); err != nil {
switch err {
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
}
child.trigger <- response
}
expected := len(w.subscriptions)
for i := 0; i < expected; i++ {
if child := <-w.acks; child != nil {
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
}
}
}
Expand Down Expand Up @@ -548,13 +566,13 @@ func (w *brokerConsumer) abort(err error) {

for child := range w.subscriptions {
child.sendError(err)
child.trigger <- none{}
child.trigger <- nil
}

for newSubscription := range w.newSubscriptions {
for _, child := range newSubscription {
child.sendError(err)
child.trigger <- none{}
child.trigger <- nil
}
}
}
Expand Down
37 changes: 18 additions & 19 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ func TestConsumerOffsetManual(t *testing.T) {
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

for i := 0; i <= 10; i++ {
for i := 0; i < 10; i++ {
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
leader.Returns(fetchResponse)
}

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
config := NewConfig()
config.Consumer.Return.Errors = true
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -176,6 +178,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
if message.Partition != partition {
t.Error("Incorrect message partition!")
}
Logger.Println(partition, i)
}
safeClose(t, consumer)
wg.Done()
Expand Down Expand Up @@ -237,26 +240,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
leader0.Returns(fetchResponse)

// leader0 provides last message on partition 1
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader0.Returns(fetchResponse)

// leader1 provides last message on partition 0
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader1.Returns(fetchResponse)

wg.Wait()
leader1.Close()
leader0.Close()
seedBroker.Close()

wg.Wait()

safeClose(t, master)
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand All @@ -278,16 +271,22 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)
time.Sleep(50 * time.Millisecond)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0)
leader.Returns(fetchResponse)

<-c0.Messages()

fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 1)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), 0)
leader.Returns(fetchResponse)

safeClose(t, c1)
Expand Down