From f0062da5ae1a62f7841e563a244e11a5ad9b96d7 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 24 Nov 2016 10:00:52 -0500 Subject: [PATCH] Sync Producer: Don't change config in constructor It's surprising, for one thing, and as Alexander Zhuravlev discovered it can actually lead to data races if you already have async producers active using the same config object. Instead, validate that it is pre-set correctly and return a config error if it is not, so it is on the user to set things up properly (it's only one flag away from default so it's not that big a deal). --- sync_producer.go | 28 ++++++++++++++++++++++++++-- sync_producer_test.go | 3 +++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sync_producer.go b/sync_producer.go index b181527f0..c77ae3140 100644 --- a/sync_producer.go +++ b/sync_producer.go @@ -9,6 +9,9 @@ import "sync" // The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual // durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. // There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost. +// +// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to +// be set to true in its configuration. type SyncProducer interface { // SendMessage produces a given message, and returns only when it either has @@ -36,6 +39,15 @@ type syncProducer struct { // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration. func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) { + if config == nil { + config = NewConfig() + config.Producer.Return.Successes = true + } + + if err := verifyProducerConfig(config); err != nil { + return nil, err + } + p, err := NewAsyncProducer(addrs, config) if err != nil { return nil, err @@ -46,6 +58,10 @@ func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) { // NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still // necessary to call Close() on the underlying client when shutting down this producer. func NewSyncProducerFromClient(client Client) (SyncProducer, error) { + if err := verifyProducerConfig(client.Config()); err != nil { + return nil, err + } + p, err := NewAsyncProducerFromClient(client) if err != nil { return nil, err @@ -54,8 +70,6 @@ func NewSyncProducerFromClient(client Client) (SyncProducer, error) { } func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer { - p.conf.Producer.Return.Successes = true - p.conf.Producer.Return.Errors = true sp := &syncProducer{producer: p} sp.wg.Add(2) @@ -65,6 +79,16 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer { return sp } +func verifyProducerConfig(config *Config) error { + if !config.Producer.Return.Errors { + return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer") + } + if !config.Producer.Return.Successes { + return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer") + } + return nil +} + func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) { oldMetadata := msg.Metadata defer func() { diff --git a/sync_producer_test.go b/sync_producer_test.go index 12ed20e1f..c1519a148 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -69,6 +69,7 @@ func TestSyncProducerBatch(t *testing.T) { config := NewConfig() config.Producer.Flush.Messages = 3 + config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) @@ -116,6 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) { config := NewConfig() config.Producer.Flush.Messages = 100 + config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) @@ -155,6 +157,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) { config := NewConfig() config.Metadata.Retry.Max = 0 config.Producer.Retry.Max = 0 + config.Producer.Return.Successes = true producer, err := NewSyncProducer([]string{broker.Addr()}, config) if err != nil {