Skip to content

Commit

Permalink
Process results of a single fetch request in parallel
Browse files Browse the repository at this point in the history
And abandon partitions that take too long (e.g. because the client is stuck).
  • Loading branch information
eapache committed Mar 23, 2015
1 parent 94a77f7 commit 77ac8a6
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 97 deletions.
190 changes: 111 additions & 79 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
responses: make(chan *FetchResponse, 1),
results: make(chan error, 1),
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
Expand All @@ -126,6 +128,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
}

go withRecover(child.dispatcher)
go withRecover(child.responseHandler)

child.broker = c.refBrokerConsumer(leader)
child.broker.input <- child
Expand Down Expand Up @@ -245,9 +248,12 @@ type partitionConsumer struct {
topic string
partition int32

broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError

responses chan *FetchResponse
results chan error
trigger, dying chan none

fetchSize int32
Expand Down Expand Up @@ -293,6 +299,7 @@ func (child *partitionConsumer) dispatcher() {
child.consumer.removeChild(child)
close(child.messages)
close(child.errors)
close(child.responses)
}

func (child *partitionConsumer) dispatch() error {
Expand Down Expand Up @@ -367,6 +374,79 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) responseHandler() {
for response := range child.responses {
ret := child.handleResponse(response)
child.results <- ret
}
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
}

if len(block.MsgSet.Messages) == 0 {
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.MsgSet.PartialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}

return nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default

incomplete := false
atLeastOne := false
prelude := true
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
child.offset = msg.Offset + 1
} else {
incomplete = true
}
}

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
}
return nil
}

// brokerConsumer

type brokerConsumer struct {
Expand Down Expand Up @@ -442,17 +522,35 @@ func (w *brokerConsumer) subscriptionConsumer() {
}

for child := range w.subscriptions {
if err := w.handleResponse(child, response); err != nil {
switch err {
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
child.responses <- response
}
for child := range w.subscriptions {
select {
case err := <-child.results:
if err != nil {
switch err {
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
}
}
case <-time.After(1 * time.Second):
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because it wasn't being consumed\n", w.broker.ID(), child.topic, child.partition)
go func(child *partitionConsumer) {
switch err := <-child.results; err {
case nil, ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
break
default:
child.sendError(err)
}
child.trigger <- none{}
}(child)
}
}
}
Expand Down Expand Up @@ -505,69 +603,3 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {

return w.broker.Fetch(request)
}

func (w *brokerConsumer) handleResponse(child *partitionConsumer, response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return ErrIncompleteResponse
}

if block.Err != ErrNoError {
return block.Err
}

if len(block.MsgSet.Messages) == 0 {
// We got no messages. If we got a trailing one then we need to ask for more data.
// Otherwise we just poll again and wait for one to be produced...
if block.MsgSet.PartialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
}
}

return nil
}

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default

incomplete := false
atLeastOne := false
prelude := true
for _, msgBlock := range block.MsgSet.Messages {

for _, msg := range msgBlock.Messages() {
if prelude && msg.Offset < child.offset {
continue
}
prelude = false

if msg.Offset >= child.offset {
atLeastOne = true
child.messages <- &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: msg.Offset,
}
child.offset = msg.Offset + 1
} else {
incomplete = true
}
}

}

if incomplete || !atLeastOne {
return ErrIncompleteResponse
}
return nil
}
24 changes: 6 additions & 18 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func TestConsumerOffsetManual(t *testing.T) {
}
}

leader.Close()
safeClose(t, consumer)
safeClose(t, master)
leader.Close()
}

func TestConsumerLatestOffset(t *testing.T) {
Expand Down Expand Up @@ -237,26 +237,14 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
leader0.Returns(fetchResponse)

// leader0 provides last message on partition 1
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader0.Returns(fetchResponse)

// leader1 provides last message on partition 0
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader1.Returns(fetchResponse)

wg.Wait()
leader1.Close()
leader0.Close()
seedBroker.Close()
wg.Wait()
safeClose(t, master)
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand All @@ -278,15 +266,15 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

time.Sleep(50 * time.Millisecond)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

Expand Down

0 comments on commit 77ac8a6

Please sign in to comment.