Skip to content

Commit

Permalink
Add high water mark offset support to the consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 12, 2015
1 parent 3e3929d commit 3a200f5
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 61 deletions.
25 changes: 20 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type Config struct {
// Similar to the `partitioner.class` setting for the JVM producer.
Partitioner PartitionerConstructor
// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
AckSuccesses bool
ReturnSuccesses bool
// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
ReturnErrors bool

// The following config options control how often messages are batched up and sent to the broker. By default,
// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
Expand All @@ -68,6 +70,11 @@ type Config struct {

// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
Consumer struct {
Retry struct {
// How long to wait after a failing to read from a partition before trying again (default 2s).
Backoff time.Duration
}

// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
Fetch struct {
// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
Expand All @@ -87,6 +94,9 @@ type Config struct {
// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
ReturnErrors bool
}

// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
Expand Down Expand Up @@ -117,10 +127,13 @@ func NewConfig() *Config {
c.Producer.Partitioner = NewHashPartitioner
c.Producer.Retry.Max = 3
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.ReturnErrors = true

c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 32768
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.ReturnErrors = false

c.ChannelBufferSize = 256

Expand Down Expand Up @@ -175,7 +188,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
}

// validate the Produce values
// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
Expand All @@ -196,12 +209,12 @@ func (c *Config) Validate() error {
case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
case c.Producer.Retry.Max < 0:
return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
case c.Producer.Retry.Backoff < 0:
return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
}

// validate the Consume values
// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
Expand All @@ -211,6 +224,8 @@ func (c *Config) Validate() error {
return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
}

// validate misc shared values
Expand Down
58 changes: 37 additions & 21 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -207,9 +208,16 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
// PartitionConsumer

// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
// or AsyncClose() on a consumer to avoid leaks, it will not be garbage-collected automatically when
// it passes out of scope (this is in addition to calling Close on the underlying consumer's client,
// which is still necessary).
//
// The simplest way of using a PartitionCOnsumer is to loop over if Messages channel using a for/range
// loop. The PartitionConsumer will under no circumstances stop by itself once it is started. It will
// just keep retrying ig it encounters errors. By default, it just logs these errors to sarama.Logger;
// if you want to handle errors yourself, set your config's Consumer.ReturnErrors to true, and read
// from the Errors channel as well, using a select statement or in a separate goroutine. Check out
// the examples of Consumer to see examples of these different approaches.
type PartitionConsumer interface {

// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
Expand All @@ -224,15 +232,18 @@ type PartitionConsumer interface {
// call this before calling Close on the underlying client.
Close() error

// Errors returns the read channel for any errors that occurred while consuming the partition.
// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
// the partition consumer will shut down by itself. It will just wait until it is able to continue
// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
// by consuming this channel and calling Close or AsyncClose when appropriate.
// Messages returns the read channel for the messages that are returned by the broker.
Messages() <-chan *ConsumerMessage

// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
Errors() <-chan *ConsumerError

// Messages returns the read channel for the messages that are returned by the broker
Messages() <-chan *ConsumerMessage
// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
// be used for the next message that will be produced. You can use this to determine high far behind
// the processing is.
HighWaterMarkOffset() int64
}

type partitionConsumer struct {
Expand All @@ -246,24 +257,31 @@ type partitionConsumer struct {
errors chan *ConsumerError
trigger, dying chan none

fetchSize int32
offset int64
fetchSize int32
offset int64
highWaterMarkOffset int64
}

func (child *partitionConsumer) sendError(err error) {
child.errors <- &ConsumerError{
cErr := &ConsumerError{
Topic: child.topic,
Partition: child.partition,
Err: err,
}

if child.conf.Consumer.ReturnErrors {
child.errors <- cErr
} else {
Logger.Println(cErr)
}
}

func (child *partitionConsumer) dispatcher() {
for _ = range child.trigger {
select {
case <-child.dying:
close(child.trigger)
default:
case <-time.After(child.conf.Consumer.Retry.Backoff):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
Expand All @@ -272,13 +290,6 @@ func (child *partitionConsumer) dispatcher() {
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}

// there's no point in trying again *right* away
select {
case <-child.dying:
close(child.trigger)
case <-time.After(10 * time.Second):
}
}
}
}
Expand Down Expand Up @@ -365,6 +376,10 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) HighWaterMarkOffset() int64 {
return atomic.LoadInt64(&child.highWaterMarkOffset)
}

// brokerConsumer

type brokerConsumer struct {
Expand Down Expand Up @@ -534,6 +549,7 @@ func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchRe

// we got messages, reset our fetch size in case it was increased for a previous request
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)

incomplete := false
atLeastOne := false
Expand Down
69 changes: 64 additions & 5 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func TestConsumerLatestOffset(t *testing.T) {

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
block := fetchResponse.GetBlock("my_topic", 0)
block.HighWaterMarkOffset = 1234
leader.Returns(fetchResponse)

master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
Expand All @@ -78,13 +80,25 @@ func TestConsumerLatestOffset(t *testing.T) {
t.Fatal(err)
}

leader.Close()
safeClose(t, consumer)
msg := <-consumer.Messages()

// we deliver one message, so it should be one higher than we return in the OffsetResponse
if msg.Offset != 0x010101 {
t.Error("Latest message offset not fetched correctly:", msg.Offset)
}

// we deliver one message, so it should be one higher than we return in the OffsetResponse
// this way it is set correctly for the next FetchRequest.
if consumer.(*partitionConsumer).offset != 0x010102 {
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
}

if hwmo := consumer.HighWaterMarkOffset(); hwmo != 1234 {
t.Errorf("Expected high water mark offset 1234, found %d", hwmo)
}

leader.Close()
safeClose(t, consumer)
}

func TestConsumerFunnyOffsets(t *testing.T) {
Expand Down Expand Up @@ -142,7 +156,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.Returns(metadataResponse)

// launch test goroutines
master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
config := NewConfig()
config.Consumer.Retry.Backoff = 0
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -290,10 +306,50 @@ func TestConsumerInterleavedClose(t *testing.T) {
seedBroker.Close()
}

// This example has the simplest use case of the consumer. It simply
// iterates over the messages channel using a for/range loop. Because
// a producer never stopsunless requested, a signal handler is registered
// so we can trigger a clean shutdown of the consumer.
func ExampleConsumer_for_loop() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := master.Close(); err != nil {
log.Fatalln(err)
}
}()

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
log.Fatalln(err)
}

go func() {
// By default, the consumer will always keep going, unless we tell it to stop.
// In this case, we capture the SIGINT signal so we can tell the consumer to stop
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
consumer.AsyncClose()
}()

msgCount := 0
for message := range consumer.Messages() {
log.Println(string(message.Value))
msgCount++
}
log.Println("Processed", msgCount, "messages.")
}

// This example shows how to use a consumer with a select statement
// dealing with the different channels.
func ExampleConsumer_select() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
config := NewConfig()
config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln(err)
}
Expand Down Expand Up @@ -336,7 +392,10 @@ consumerLoop:
// This example shows how to use a consumer with different goroutines
// to read from the Messages and Errors channels.
func ExampleConsumer_goroutines() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
config := NewConfig()
config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln(err)
}
Expand Down
40 changes: 38 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
config.ChannelBufferSize = 20
config.Producer.Flush.Frequency = 50 * time.Millisecond
config.Producer.Flush.Messages = 200
config.Producer.AckSuccesses = true
config.Producer.ReturnSuccesses = true
producer, err := NewProducer([]string{kafkaAddr}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -122,7 +122,9 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)

config.Producer.AckSuccesses = true
config.Producer.ReturnSuccesses = true
config.Consumer.ReturnErrors = true

client, err := NewClient([]string{kafkaAddr}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -182,3 +184,37 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, consumer)
safeClose(t, client)
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
checkKafkaAvailability(t)

p, err := NewSyncProducer([]string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, p)

_, offset, err := p.SendMessage("single_partition", nil, StringEncoder("Test"))
if err != nil {
t.Fatal(err)
}

c, err := NewConsumer([]string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)

pc, err := c.ConsumePartition("single_partition", 0, OffsetOldest)
if err != nil {
t.Fatal(err)
}

<-pc.Messages()

if hwmo := pc.HighWaterMarkOffset(); hwmo != offset+1 {
t.Logf("Last produced offset %d; high water mark should be one higher but found %d.", offset, hwmo)
}

safeClose(t, pc)
}
Loading

0 comments on commit 3a200f5

Please sign in to comment.