diff --git a/consumer.go b/consumer.go index e62e13d95..87a20514a 100644 --- a/consumer.go +++ b/consumer.go @@ -3,6 +3,7 @@ package sarama import ( "fmt" "sync" + "sync/atomic" "time" ) @@ -238,6 +239,11 @@ type PartitionConsumer interface { // errors are logged and not returned over this channel. If you want to implement any custom errpr // handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel. Errors() <-chan *ConsumerError + + // HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will + // be used for the next message that will be produced. You can use this to determine high far behind + // the processing is. + HighWaterMarkOffset() int64 } type partitionConsumer struct { @@ -251,8 +257,9 @@ type partitionConsumer struct { errors chan *ConsumerError trigger, dying chan none - fetchSize int32 - offset int64 + fetchSize int32 + offset int64 + highWaterMarkOffset int64 } func (child *partitionConsumer) sendError(err error) { @@ -369,6 +376,10 @@ func (child *partitionConsumer) Close() error { return nil } +func (child *partitionConsumer) HighWaterMarkOffset() int64 { + return atomic.LoadInt64(&child.highWaterMarkOffset) +} + // brokerConsumer type brokerConsumer struct { @@ -538,6 +549,7 @@ func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchRe // we got messages, reset our fetch size in case it was increased for a previous request child.fetchSize = child.conf.Consumer.Fetch.Default + atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset) incomplete := false atLeastOne := false diff --git a/consumer_test.go b/consumer_test.go index 4887f495d..3d5dbe6e5 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -65,6 +65,8 @@ func TestConsumerLatestOffset(t *testing.T) { fetchResponse := new(FetchResponse) fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101) + block := fetchResponse.GetBlock("my_topic", 0) + block.HighWaterMarkOffset = 1234 leader.Returns(fetchResponse) master, err := NewConsumer([]string{seedBroker.Addr()}, nil) @@ -78,13 +80,25 @@ func TestConsumerLatestOffset(t *testing.T) { t.Fatal(err) } - leader.Close() - safeClose(t, consumer) + msg := <-consumer.Messages() + + // we deliver one message, so it should be one higher than we return in the OffsetResponse + if msg.Offset != 0x010101 { + t.Error("Latest message offset not fetched correctly:", msg.Offset) + } // we deliver one message, so it should be one higher than we return in the OffsetResponse + // this way it is set correctly for the next FetchRequest. if consumer.(*partitionConsumer).offset != 0x010102 { t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset) } + + if hwmo := consumer.HighWaterMarkOffset(); hwmo != 1234 { + t.Errorf("Expected high water mark offset 1234, found %d", hwmo) + } + + leader.Close() + safeClose(t, consumer) } func TestConsumerFunnyOffsets(t *testing.T) { diff --git a/functional_test.go b/functional_test.go index 37b6fa285..e622e3f16 100644 --- a/functional_test.go +++ b/functional_test.go @@ -184,3 +184,37 @@ func testProducingMessages(t *testing.T, config *Config) { safeClose(t, consumer) safeClose(t, client) } + +func TestConsumerHighWaterMarkOffset(t *testing.T) { + checkKafkaAvailability(t) + + p, err := NewSyncProducer([]string{kafkaAddr}, nil) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, p) + + _, offset, err := p.SendMessage("single_partition", nil, StringEncoder("Test")) + if err != nil { + t.Fatal(err) + } + + c, err := NewConsumer([]string{kafkaAddr}, nil) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, c) + + pc, err := c.ConsumePartition("single_partition", 0, OffsetOldest) + if err != nil { + t.Fatal(err) + } + + <-pc.Messages() + + if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 { + t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo) + } + + safeClose(t, pc) +} diff --git a/mocks/consumer.go b/mocks/consumer.go index 0aa8389c6..6234be1d3 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -2,6 +2,7 @@ package mocks import ( "sync" + "sync/atomic" "github.com/Shopify/sarama" ) @@ -132,13 +133,13 @@ type PartitionConsumer struct { consumed bool errorsShouldBeDrained bool messagesShouldBeDrained bool + highWaterMarkOffset int64 } func (pc *PartitionConsumer) handleExpectations() { pc.l.Lock() defer pc.l.Unlock() - var offset int64 for ex := range pc.expectations { if ex.Err != nil { pc.errors <- &sarama.ConsumerError{ @@ -147,11 +148,11 @@ func (pc *PartitionConsumer) handleExpectations() { Err: ex.Err, } } else { - offset++ + atomic.AddInt64(&pc.highWaterMarkOffset, 1) ex.Msg.Topic = pc.topic ex.Msg.Partition = pc.partition - ex.Msg.Offset = offset + ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset) pc.messages <- ex.Msg } @@ -231,6 +232,10 @@ func (pc *PartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { return pc.messages } +func (pc *PartitionConsumer) HighWaterMarkOffset() int64 { + return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1 +} + /////////////////////////////////////////////////// // Expectation API ///////////////////////////////////////////////////