Skip to content

Commit

Permalink
Merge pull request #339 from Shopify/consumer_highwatermark
Browse files Browse the repository at this point in the history
High water mark offset support to the consumer
  • Loading branch information
wvanbergen committed Apr 27, 2015
2 parents d0c297e + 5f78d90 commit 197f620
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 11 deletions.
16 changes: 14 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -255,6 +256,11 @@ type PartitionConsumer interface {
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine how far behind
// the processing is.
HighWaterMarkOffset() int64
}

type partitionConsumer struct {
Expand All @@ -268,8 +274,9 @@ type partitionConsumer struct {
errors chan *ConsumerError
trigger, dying chan none

fetchSize int32
offset int64
fetchSize int32
offset int64
highWaterMarkOffset int64
}

func (child *partitionConsumer) sendError(err error) {
Expand Down Expand Up @@ -391,6 +398,10 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
Expand Down Expand Up @@ -422,6 +433,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
Expand Down
26 changes: 20 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestConsumerOffsetManual(t *testing.T) {
leader.Close()
}

func TestConsumerLatestOffset(t *testing.T) {
func TestConsumerOffsetNewest(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand All @@ -69,15 +69,17 @@ func TestConsumerLatestOffset(t *testing.T) {
seedBroker.Returns(metadataResponse)

offsetResponseNewest := new(OffsetResponse)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
leader.Returns(offsetResponseNewest)

offsetResponseOldest := new(OffsetResponse)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
leader.Returns(offsetResponseOldest)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
block := fetchResponse.GetBlock("my_topic", 0)
block.HighWaterMarkOffset = 14
leader.Returns(fetchResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
Expand All @@ -91,12 +93,24 @@ func TestConsumerLatestOffset(t *testing.T) {
t.Fatal(err)
}

msg := <-consumer.Messages()

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if msg.Offset != 10 {
t.Error("Latest message offset not fetched correctly:", msg.Offset)
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
t.Errorf("Expected high water mark offset 14, found %d", hwmo)
}

leader.Close()
safeClose(t, consumer)
safeClose(t, master)

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if consumer.(*partitionConsumer).offset != 0x010102 {
// We deliver one message, so it should be one higher than we return in the OffsetResponse.
// This way it is set correctly for the next FetchRequest.
if consumer.(*partitionConsumer).offset != 11 {
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
}
}
Expand Down
34 changes: 34 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,37 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {

safeClose(t, consumer)
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
checkKafkaAvailability(t)

p, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, p)

_, offset, err := p.SendMessage(&ProducerMessage{Topic: "test.1", Value: StringEncoder("Test")})
if err != nil {
t.Fatal(err)
}

c, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)

pc, err := c.ConsumePartition("test.1", 0, OffsetOldest)
if err != nil {
t.Fatal(err)
}

<-pc.Messages()

if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
}

safeClose(t, pc)
}
11 changes: 8 additions & 3 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"sync"
"sync/atomic"

"github.com/Shopify/sarama"
)
Expand Down Expand Up @@ -175,13 +176,13 @@ type PartitionConsumer struct {
consumed bool
errorsShouldBeDrained bool
messagesShouldBeDrained bool
highWaterMarkOffset int64
}

func (pc *PartitionConsumer) handleExpectations() {
pc.l.Lock()
defer pc.l.Unlock()

var offset int64
for ex := range pc.expectations {
if ex.Err != nil {
pc.errors <- &sarama.ConsumerError{
Expand All @@ -190,11 +191,11 @@ func (pc *PartitionConsumer) handleExpectations() {
Err: ex.Err,
}
} else {
offset++
atomic.AddInt64(&pc.highWaterMarkOffset, 1)

ex.Msg.Topic = pc.topic
ex.Msg.Partition = pc.partition
ex.Msg.Offset = offset
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- ex.Msg
}
Expand Down Expand Up @@ -274,6 +275,10 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
}

///////////////////////////////////////////////////
// Expectation API
///////////////////////////////////////////////////
Expand Down

0 comments on commit 197f620

Please sign in to comment.