Skip to content

Commit

Permalink
Add high water mark offset support to the consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 12, 2015
1 parent 35543a8 commit 3f853c6
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 7 deletions.
16 changes: 14 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -238,6 +239,11 @@ type PartitionConsumer interface {
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine high far behind
// the processing is.
HighWaterMarkOffset() int64
}

type partitionConsumer struct {
Expand All @@ -251,8 +257,9 @@ type partitionConsumer struct {
errors chan *ConsumerError
trigger, dying chan none

fetchSize int32
offset int64
fetchSize int32
offset int64
highWaterMarkOffset int64
}

func (child *partitionConsumer) sendError(err error) {
Expand Down Expand Up @@ -369,6 +376,10 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
}

// brokerConsumer

type brokerConsumer struct {
Expand Down Expand Up @@ -538,6 +549,7 @@ func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchRe

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
Expand Down
18 changes: 16 additions & 2 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func TestConsumerLatestOffset(t *testing.T) {

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
block := fetchResponse.GetBlock("my_topic", 0)
block.HighWaterMarkOffset = 1234
leader.Returns(fetchResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
Expand All @@ -78,13 +80,25 @@ func TestConsumerLatestOffset(t *testing.T) {
t.Fatal(err)
}

leader.Close()
safeClose(t, consumer)
msg := <-consumer.Messages()

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if msg.Offset != 0x010101 {
t.Error("Latest message offset not fetched correctly:", msg.Offset)
}

// we deliver one message, so it should be one higher than we return in the OffsetResponse
// this way it is set correctly for the next FetchRequest.
if consumer.(*partitionConsumer).offset != 0x010102 {
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != 1234 {
t.Errorf("Expected high water mark offset 1234, found %d", hwmo)
}

leader.Close()
safeClose(t, consumer)
}

func TestConsumerFunnyOffsets(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,37 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, consumer)
safeClose(t, client)
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
checkKafkaAvailability(t)

p, err := NewSyncProducer([]string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, p)

_, offset, err := p.SendMessage("single_partition", nil, StringEncoder("Test"))
if err != nil {
t.Fatal(err)
}

c, err := NewConsumer([]string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)

pc, err := c.ConsumePartition("single_partition", 0, OffsetOldest)
if err != nil {
t.Fatal(err)
}

<-pc.Messages()

if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
}

safeClose(t, pc)
}
11 changes: 8 additions & 3 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"sync"
"sync/atomic"

"github.com/Shopify/sarama"
)
Expand Down Expand Up @@ -132,13 +133,13 @@ type PartitionConsumer struct {
consumed bool
errorsShouldBeDrained bool
messagesShouldBeDrained bool
highWaterMarkOffset int64
}

func (pc *PartitionConsumer) handleExpectations() {
pc.l.Lock()
defer pc.l.Unlock()

var offset int64
for ex := range pc.expectations {
if ex.Err != nil {
pc.errors <- &sarama.ConsumerError{
Expand All @@ -147,11 +148,11 @@ func (pc *PartitionConsumer) handleExpectations() {
Err: ex.Err,
}
} else {
offset++
atomic.AddInt64(&pc.highWaterMarkOffset, 1)

ex.Msg.Topic = pc.topic
ex.Msg.Partition = pc.partition
ex.Msg.Offset = offset
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- ex.Msg
}
Expand Down Expand Up @@ -231,6 +232,10 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}

func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1
}

///////////////////////////////////////////////////
// Expectation API
///////////////////////////////////////////////////
Expand Down

0 comments on commit 3f853c6

Please sign in to comment.