Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak numbers related to retries, and message delivery guarantee #106

Merged
merged 3 commits into from
Dec 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions streamconfig/kafkaconfig/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ type Producer struct {
IgnoreErrors []kafka.ErrorCode

// MaxDeliveryRetries dictates how many times to retry sending a failing
// MessageSet. Note: retrying may cause reordering. Defaults to 2 retries. Use
// `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery.
// MessageSet. Defaults to 5 retries.
MaxDeliveryRetries int `kafka:"message.send.max.retries" split_words:"true"`

// MaxInFlightRequests dictates the maximum number of in-flight requests per
// broker connection. This is a generic property applied to all broker
// communication, however it is primarily relevant to produce requests. In
// particular, note that other mechanisms limit the number of outstanding
// consumer fetch request per broker to one.
//
// Note: having more than one in flight request may cause reordering. Use
// `streamconfig.KafkaOrderedDelivery()` to guarantee order delivery.
MaxInFlightRequests int `kafka:"max.in.flight.requests.per.connection,omitempty" split_words:"true"` // nolint: lll

// MaxQueueBufferDuration is the delay to wait for messages in the producer
Expand Down Expand Up @@ -92,6 +94,9 @@ type Producer struct {
// Defaults to `AckLeader`.
RequiredAcks Ack `kafka:"{topic}.request.required.acks" split_words:"true"`

// RetryBackoff sets the backoff time before retrying a protocol request.
RetryBackoff time.Duration `kafka:"retry.backoff.ms" split_words:"true"`

// SecurityProtocol is the protocol used to communicate with brokers.
SecurityProtocol Protocol `kafka:"security.protocol,omitempty" split_words:"true"`

Expand Down Expand Up @@ -164,12 +169,13 @@ var ProducerDefaults = Producer{
kafka.ErrNotEnoughReplicasAfterAppend,
kafka.ErrUnknownMemberID,
},
MaxDeliveryRetries: 2,
MaxDeliveryRetries: 5,
MaxInFlightRequests: 1000000,
MaxQueueBufferDuration: 10 * time.Millisecond,
MaxQueueSizeKBytes: 2097151,
MaxQueueSizeMessages: 1000000,
RequiredAcks: AckLeader,
RequiredAcks: AckAll,
RetryBackoff: 15 * time.Second,
SecurityProtocol: ProtocolPlaintext,
SessionTimeout: 30 * time.Second,
SSL: SSL{},
Expand Down
14 changes: 11 additions & 3 deletions streamconfig/kafkaconfig/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var producerOmitempties = []string{
"{topic}.request.required.acks",
"message.send.max.retries",
"statistics.interval.ms",
"retry.backoff.ms",
}

func TestProducer(t *testing.T) {
Expand All @@ -37,7 +38,8 @@ func TestProducer(t *testing.T) {
MaxQueueBufferDuration: time.Duration(0),
MaxQueueSizeKBytes: 0,
MaxQueueSizeMessages: 0,
RequiredAcks: kafkaconfig.AckAll,
RequiredAcks: kafkaconfig.AckLeader,
RetryBackoff: 10 * time.Second,
SecurityProtocol: kafkaconfig.ProtocolPlaintext,
SessionTimeout: time.Duration(0),
SSL: kafkaconfig.SSL{KeyPath: ""},
Expand Down Expand Up @@ -74,12 +76,13 @@ func TestProducerDefaults(t *testing.T) {
assert.Equal(t, kafkaconfig.CompressionSnappy, config.CompressionCodec)
assert.Equal(t, 1*time.Second, config.HeartbeatInterval)
assert.Equal(t, errs, config.IgnoreErrors)
assert.Equal(t, 2, config.MaxDeliveryRetries)
assert.Equal(t, 5, config.MaxDeliveryRetries)
assert.Equal(t, 1000000, config.MaxInFlightRequests)
assert.Equal(t, 10*time.Millisecond, config.MaxQueueBufferDuration)
assert.Equal(t, 2097151, config.MaxQueueSizeKBytes)
assert.Equal(t, 1000000, config.MaxQueueSizeMessages)
assert.EqualValues(t, kafkaconfig.AckLeader, config.RequiredAcks)
assert.EqualValues(t, kafkaconfig.AckAll, config.RequiredAcks)
assert.EqualValues(t, 15*time.Second, config.RetryBackoff)
assert.Equal(t, kafkaconfig.ProtocolPlaintext, config.SecurityProtocol)
assert.Equal(t, 30*time.Second, config.SessionTimeout)
assert.Equal(t, kafkaconfig.SSL{}, config.SSL)
Expand Down Expand Up @@ -233,6 +236,11 @@ func TestProducer_ConfigMap(t *testing.T) {
&kafka.ConfigMap{"default.topic.config": kafka.ConfigMap{"request.required.acks": -1}},
},

"RetryBackoff": {
&kafkaconfig.Producer{RetryBackoff: 1 * time.Second},
&kafka.ConfigMap{"retry.backoff.ms": 1000},
},

"securityProtocol (plaintext)": {
&kafkaconfig.Producer{SecurityProtocol: kafkaconfig.ProtocolPlaintext},
&kafka.ConfigMap{"security.protocol": "plaintext"},
Expand Down
11 changes: 11 additions & 0 deletions streamconfig/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ func KafkaRequireAllAck() Option {
})
}

// KafkaRetryBackoff configures the producer to use the configured retry
// backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries`
// to configure the amount of retries to execute before returning an error.
//
// This option has no effect when applied to a consumer.
func KafkaRetryBackoff(d time.Duration) Option {
return optionFunc(func(_ *Consumer, p *Producer) {
p.Kafka.RetryBackoff = d
})
}

// KafkaSecurityProtocol configures the producer or consumer to use the
// specified security protocol.
func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option {
Expand Down
10 changes: 10 additions & 0 deletions streamconfig/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ func TestOptions(t *testing.T) {
},
},

"KafkaRetryBackoff": {
[]streamconfig.Option{streamconfig.KafkaRetryBackoff(1 * time.Minute)},
streamconfig.Consumer{
Kafka: kafkaconfig.Consumer{},
},
streamconfig.Producer{
Kafka: kafkaconfig.Producer{RetryBackoff: 1 * time.Minute},
},
},

"KafkaSecurityProtocol": {
[]streamconfig.Option{streamconfig.KafkaSecurityProtocol(kafkaconfig.ProtocolSSL)},
streamconfig.Consumer{
Expand Down