Skip to content

Commit

Permalink
Merge pull request #336 from Shopify/configuration_tweaks
Browse files Browse the repository at this point in the history
Configuration tweaks
  • Loading branch information
wvanbergen committed Mar 12, 2015
2 parents 3e3929d + 251a2db commit 35543a8
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 55 deletions.
25 changes: 20 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type Config struct {
// Similar to the `partitioner.class` setting for the JVM producer.
Partitioner PartitionerConstructor
// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
AckSuccesses bool
ReturnSuccesses bool
// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
ReturnErrors bool

// The following config options control how often messages are batched up and sent to the broker. By default,
// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
Expand All @@ -68,6 +70,11 @@ type Config struct {

// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
Consumer struct {
Retry struct {
// How long to wait after a failing to read from a partition before trying again (default 2s).
Backoff time.Duration
}

// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
Fetch struct {
// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
Expand All @@ -87,6 +94,9 @@ type Config struct {
// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
// Equivalent to the JVM's `fetch.wait.max.ms`.
MaxWaitTime time.Duration

// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
ReturnErrors bool
}

// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
Expand Down Expand Up @@ -117,10 +127,13 @@ func NewConfig() *Config {
c.Producer.Partitioner = NewHashPartitioner
c.Producer.Retry.Max = 3
c.Producer.Retry.Backoff = 100 * time.Millisecond
c.Producer.ReturnErrors = true

c.Consumer.Fetch.Min = 1
c.Consumer.Fetch.Default = 32768
c.Consumer.Retry.Backoff = 2 * time.Second
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.ReturnErrors = false

c.ChannelBufferSize = 256

Expand Down Expand Up @@ -175,7 +188,7 @@ func (c *Config) Validate() error {
return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
}

// validate the Produce values
// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
Expand All @@ -196,12 +209,12 @@ func (c *Config) Validate() error {
case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
case c.Producer.Retry.Max < 0:
return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
case c.Producer.Retry.Backoff < 0:
return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
}

// validate the Consume values
// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
Expand All @@ -211,6 +224,8 @@ func (c *Config) Validate() error {
return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
}

// validate misc shared values
Expand Down
44 changes: 24 additions & 20 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,16 @@ func (c *consumer) unrefBrokerConsumer(broker *Broker) {
// PartitionConsumer

// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
// or AsyncClose() on a consumer to avoid leaks, it will not be garbage-collected automatically when
// it passes out of scope (this is in addition to calling Close on the underlying consumer's client,
// which is still necessary).
//
// The simplest way of using a PartitionCOnsumer is to loop over if Messages channel using a for/range
// loop. The PartitionConsumer will under no circumstances stop by itself once it is started. It will
// just keep retrying ig it encounters errors. By default, it just logs these errors to sarama.Logger;
// if you want to handle errors yourself, set your config's Consumer.ReturnErrors to true, and read
// from the Errors channel as well, using a select statement or in a separate goroutine. Check out
// the examples of Consumer to see examples of these different approaches.
type PartitionConsumer interface {

// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
Expand All @@ -224,15 +231,13 @@ type PartitionConsumer interface {
// call this before calling Close on the underlying client.
Close() error

// Errors returns the read channel for any errors that occurred while consuming the partition.
// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
// the partition consumer will shut down by itself. It will just wait until it is able to continue
// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
// by consuming this channel and calling Close or AsyncClose when appropriate.
Errors() <-chan *ConsumerError

// Messages returns the read channel for the messages that are returned by the broker
// Messages returns the read channel for the messages that are returned by the broker.
Messages() <-chan *ConsumerMessage

// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
// errors are logged and not returned over this channel. If you want to implement any custom errpr
// handling, set your config's Consumer.ReturnErrors setting to true, and read from this channel.
Errors() <-chan *ConsumerError
}

type partitionConsumer struct {
Expand All @@ -251,19 +256,25 @@ type partitionConsumer struct {
}

func (child *partitionConsumer) sendError(err error) {
child.errors <- &ConsumerError{
cErr := &ConsumerError{
Topic: child.topic,
Partition: child.partition,
Err: err,
}

if child.conf.Consumer.ReturnErrors {
child.errors <- cErr
} else {
Logger.Println(cErr)
}
}

func (child *partitionConsumer) dispatcher() {
for _ = range child.trigger {
select {
case <-child.dying:
close(child.trigger)
default:
case <-time.After(child.conf.Consumer.Retry.Backoff):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
Expand All @@ -272,13 +283,6 @@ func (child *partitionConsumer) dispatcher() {
if err := child.dispatch(); err != nil {
child.sendError(err)
child.trigger <- none{}

// there's no point in trying again *right* away
select {
case <-child.dying:
close(child.trigger)
case <-time.After(10 * time.Second):
}
}
}
}
Expand Down
51 changes: 48 additions & 3 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.Returns(metadataResponse)

// launch test goroutines
master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
config := NewConfig()
config.Consumer.Retry.Backoff = 0
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -290,10 +292,50 @@ func TestConsumerInterleavedClose(t *testing.T) {
seedBroker.Close()
}

// This example has the simplest use case of the consumer. It simply
// iterates over the messages channel using a for/range loop. Because
// a producer never stopsunless requested, a signal handler is registered
// so we can trigger a clean shutdown of the consumer.
func ExampleConsumer_for_loop() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := master.Close(); err != nil {
log.Fatalln(err)
}
}()

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
log.Fatalln(err)
}

go func() {
// By default, the consumer will always keep going, unless we tell it to stop.
// In this case, we capture the SIGINT signal so we can tell the consumer to stop
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
consumer.AsyncClose()
}()

msgCount := 0
for message := range consumer.Messages() {
log.Println(string(message.Value))
msgCount++
}
log.Println("Processed", msgCount, "messages.")
}

// This example shows how to use a consumer with a select statement
// dealing with the different channels.
func ExampleConsumer_select() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
config := NewConfig()
config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln(err)
}
Expand Down Expand Up @@ -336,7 +378,10 @@ consumerLoop:
// This example shows how to use a consumer with different goroutines
// to read from the Messages and Errors channels.
func ExampleConsumer_goroutines() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
config := NewConfig()
config.Consumer.ReturnErrors = true // Handle errors manually instead of letting Sarama log them.

master, err := NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln(err)
}
Expand Down
6 changes: 4 additions & 2 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
config.ChannelBufferSize = 20
config.Producer.Flush.Frequency = 50 * time.Millisecond
config.Producer.Flush.Messages = 200
config.Producer.AckSuccesses = true
config.Producer.ReturnSuccesses = true
producer, err := NewProducer([]string{kafkaAddr}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -122,7 +122,9 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)

config.Producer.AckSuccesses = true
config.Producer.ReturnSuccesses = true
config.Consumer.ReturnErrors = true

client, err := NewClient([]string{kafkaAddr}, config)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 5 additions & 3 deletions mocks/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ func NewProducer(t ErrorReporter, config *sarama.Config) *Producer {
expectation := mp.expectations[0]
mp.expectations = mp.expectations[1:]
if expectation.Result == errProduceSuccess {
if config.Producer.AckSuccesses {
if config.Producer.ReturnSuccesses {
mp.successes <- msg
}
} else {
mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
if config.Producer.ReturnErrors {
mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg}
}
}
}
mp.l.Unlock()
Expand Down Expand Up @@ -119,7 +121,7 @@ func (mp *Producer) Errors() <-chan *sarama.ProducerError {

// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it is produced successfully,
// i.e. it will make it available on the Successes channel if the Producer.AckSuccesses setting
// i.e. it will make it available on the Successes channel if the Producer.ReturnSuccesses setting
// is set to true.
func (mp *Producer) ExpectInputAndSucceed() {
mp.l.Lock()
Expand Down
2 changes: 1 addition & 1 deletion mocks/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMockProducerImplementsProducerInterface(t *testing.T) {

func TestProducerReturnsExpectationsToChannels(t *testing.T) {
config := sarama.NewConfig()
config.Producer.AckSuccesses = true
config.Producer.ReturnSuccesses = true
mp := NewProducer(t, config)

mp.ExpectInputAndSucceed()
Expand Down
28 changes: 18 additions & 10 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ type Producer interface {
Input() chan<- *ProducerMessage

// Successes is the success output channel back to the user when AckSuccesses is confured.
// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
// If ReturnSuccesses is true, you MUST read from this channel or the Producer will deadlock.
// It is suggested that you send and read messages together in a single select statement.
Successes() <-chan *ProducerMessage

// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
// It is suggested that you send messages and read errors together in a single select statement.
// Errors is the error output channel back to the user. You MUST read from this channel
// or the Producer will deadlock when the channel is full. Alternatively, you can set
// Producer.ReturnErrors in your config to false, which prevents errors to be returned.
Errors() <-chan *ProducerError
}

Expand Down Expand Up @@ -178,16 +179,18 @@ func (p *producer) Input() chan<- *ProducerMessage {
func (p *producer) Close() error {
p.AsyncClose()

if p.conf.Producer.AckSuccesses {
if p.conf.Producer.ReturnSuccesses {
go withRecover(func() {
for _ = range p.successes {
}
})
}

var errors ProducerErrors
for event := range p.errors {
errors = append(errors, event)
if p.conf.Producer.ReturnErrors {
for event := range p.errors {
errors = append(errors, event)
}
}

if len(errors) > 0 {
Expand Down Expand Up @@ -258,7 +261,7 @@ func (p *producer) topicDispatcher() {
if p.ownClient {
err := p.client.Close()
if err != nil {
p.errors <- &ProducerError{Err: err}
Logger.Println("producer/shutdown failed to close the embedded client:", err)
}
}
close(p.errors)
Expand Down Expand Up @@ -538,7 +541,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
if p.conf.Producer.AckSuccesses {
if p.conf.Producer.ReturnSuccesses {
p.returnSuccesses(batch)
}
continue
Expand All @@ -558,7 +561,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
switch block.Err {
case ErrNoError:
// All the messages for this topic-partition were delivered successfully!
if p.conf.Producer.AckSuccesses {
if p.conf.Producer.ReturnSuccesses {
for i := range msgs {
msgs[i].offset = block.Offset + int64(i)
}
Expand Down Expand Up @@ -731,7 +734,12 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
func (p *producer) returnError(msg *ProducerMessage, err error) {
msg.flags = 0
msg.retries = 0
p.errors <- &ProducerError{Msg: msg, Err: err}
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.ReturnErrors {
p.errors <- pErr
} else {
Logger.Println(pErr)
}
}

func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
Expand Down
Loading

0 comments on commit 35543a8

Please sign in to comment.