diff --git a/consumer.go b/consumer.go index 4c2b1d7b5..38456a9b2 100644 --- a/consumer.go +++ b/consumer.go @@ -106,7 +106,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) partition: partition, messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), - trigger: make(chan none, 1), + trigger: make(chan *FetchResponse, 1), dying: make(chan none), fetchSize: c.conf.Consumer.Fetch.Default, } @@ -162,25 +162,15 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer { c.lock.Lock() defer c.lock.Unlock() - brokerWorker := c.brokerConsumers[broker] - if brokerWorker == nil { - brokerWorker = &brokerConsumer{ - consumer: c, - broker: broker, - input: make(chan *partitionConsumer), - newSubscriptions: make(chan []*partitionConsumer), - wait: make(chan none), - subscriptions: make(map[*partitionConsumer]none), - refs: 0, - } - go withRecover(brokerWorker.subscriptionManager) - go withRecover(brokerWorker.subscriptionConsumer) - c.brokerConsumers[broker] = brokerWorker + bc := c.brokerConsumers[broker] + if bc == nil { + bc = c.newBrokerConsumer(broker) + c.brokerConsumers[broker] = bc } - brokerWorker.refs++ + bc.refs++ - return brokerWorker + return bc } func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) { @@ -245,10 +235,11 @@ type partitionConsumer struct { topic string partition int32 - broker *brokerConsumer - messages chan *ConsumerMessage - errors chan *ConsumerError - trigger, dying chan none + broker *brokerConsumer + messages chan *ConsumerMessage + errors chan *ConsumerError + trigger chan *FetchResponse + dying chan none fetchSize int32 offset int64 @@ -269,7 +260,22 @@ func (child *partitionConsumer) sendError(err error) { } func (child *partitionConsumer) dispatcher() { - for _ = range child.trigger { + for response := range child.trigger { + if response != nil { + err := child.handleResponse(response) + switch err { + case nil: + child.broker.acks <- nil + continue + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: + // these three are not fatal errors, but do require redispatching + child.broker.acks <- child + default: + child.sendError(err) + child.broker.acks <- child + } + } + select { case <-child.dying: close(child.trigger) @@ -282,7 +288,7 @@ func (child *partitionConsumer) dispatcher() { Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition) if err := child.dispatch(); err != nil { child.sendError(err) - child.trigger <- none{} + child.trigger <- nil } } } @@ -438,13 +444,29 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error { type brokerConsumer struct { consumer *consumer broker *Broker - input chan *partitionConsumer + input, acks chan *partitionConsumer newSubscriptions chan []*partitionConsumer wait chan none subscriptions map[*partitionConsumer]none refs int } +func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { + bc := &brokerConsumer{ + consumer: c, + broker: broker, + input: make(chan *partitionConsumer), + acks: make(chan *partitionConsumer), + newSubscriptions: make(chan []*partitionConsumer), + wait: make(chan none), + subscriptions: make(map[*partitionConsumer]none), + refs: 0, + } + go withRecover(bc.subscriptionManager) + go withRecover(bc.subscriptionConsumer) + return bc +} + func (w *brokerConsumer) subscriptionManager() { var buffer []*partitionConsumer @@ -508,17 +530,13 @@ func (w *brokerConsumer) subscriptionConsumer() { } for child := range w.subscriptions { - if err := child.handleResponse(response); err != nil { - switch err { - default: - child.sendError(err) - fallthrough - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable: - // these three are not fatal errors, but do require redispatching - child.trigger <- none{} - delete(w.subscriptions, child) - Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err) - } + child.trigger <- response + } + expected := len(w.subscriptions) + for i := 0; i < expected; i++ { + if child := <-w.acks; child != nil { + delete(w.subscriptions, child) + Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition) } } } @@ -548,13 +566,13 @@ func (w *brokerConsumer) abort(err error) { for child := range w.subscriptions { child.sendError(err) - child.trigger <- none{} + child.trigger <- nil } for newSubscription := range w.newSubscriptions { for _, child := range newSubscription { child.sendError(err) - child.trigger <- none{} + child.trigger <- nil } } } diff --git a/consumer_test.go b/consumer_test.go index bfae552de..0d6aa3655 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -18,13 +18,15 @@ func TestConsumerOffsetManual(t *testing.T) { metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) seedBroker.Returns(metadataResponse) - for i := 0; i <= 10; i++ { + for i := 0; i < 10; i++ { fetchResponse := new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234)) leader.Returns(fetchResponse) } - master, err := NewConsumer([]string{seedBroker.Addr()}, nil) + config := NewConfig() + config.Consumer.Return.Errors = true + master, err := NewConsumer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) } @@ -176,6 +178,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { if message.Partition != partition { t.Error("Incorrect message partition!") } + Logger.Println(partition, i) } safeClose(t, consumer) wg.Done() @@ -237,26 +240,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9)) leader0.Returns(fetchResponse) - // leader0 provides last message on partition 1 - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10)) - leader0.Returns(fetchResponse) - - // leader1 provides last message on partition 0 - fetchResponse = new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10)) - leader1.Returns(fetchResponse) - - wg.Wait() leader1.Close() leader0.Close() seedBroker.Close() + + wg.Wait() + safeClose(t, master) } func TestConsumerInterleavedClose(t *testing.T) { - t.Skip("Enable once bug #325 is fixed.") - seedBroker := newMockBroker(t, 1) leader := newMockBroker(t, 2) @@ -278,16 +271,22 @@ func TestConsumerInterleavedClose(t *testing.T) { t.Fatal(err) } - fetchResponse := new(FetchResponse) - fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) - leader.Returns(fetchResponse) + time.Sleep(50 * time.Millisecond) c1, err := master.ConsumePartition("my_topic", 1, 0) if err != nil { t.Fatal(err) } - fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) + fetchResponse := new(FetchResponse) + fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0) + leader.Returns(fetchResponse) + + <-c0.Messages() + + fetchResponse = new(FetchResponse) + fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 1) + fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), 0) leader.Returns(fetchResponse) safeClose(t, c1)