Skip to content

Commit

Permalink
Merge pull request #790 from Shopify/sync-producer/dont-change-config
Browse files Browse the repository at this point in the history
Sync Producer: Don't change config in constructor
  • Loading branch information
eapache authored Nov 25, 2016
2 parents f4842fc + f0062da commit 353cc46
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
28 changes: 26 additions & 2 deletions sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import "sync"
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
//
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
// be set to true in its configuration.
type SyncProducer interface {

// SendMessage produces a given message, and returns only when it either has
Expand Down Expand Up @@ -36,6 +39,15 @@ type syncProducer struct {

// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if config == nil {
config = NewConfig()
config.Producer.Return.Successes = true
}

if err := verifyProducerConfig(config); err != nil {
return nil, err
}

p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
Expand All @@ -46,6 +58,10 @@ func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
if err := verifyProducerConfig(client.Config()); err != nil {
return nil, err
}

p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
Expand All @@ -54,8 +70,6 @@ func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
}

func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
p.conf.Producer.Return.Successes = true
p.conf.Producer.Return.Errors = true
sp := &syncProducer{producer: p}

sp.wg.Add(2)
Expand All @@ -65,6 +79,16 @@ func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
return sp
}

func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}

func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
oldMetadata := msg.Metadata
defer func() {
Expand Down
3 changes: 3 additions & 0 deletions sync_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestSyncProducerBatch(t *testing.T) {

config := NewConfig()
config.Producer.Flush.Messages = 3
config.Producer.Return.Successes = true
producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -116,6 +117,7 @@ func TestConcurrentSyncProducer(t *testing.T) {

config := NewConfig()
config.Producer.Flush.Messages = 100
config.Producer.Return.Successes = true
producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -155,6 +157,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
config := NewConfig()
config.Metadata.Retry.Max = 0
config.Producer.Retry.Max = 0
config.Producer.Return.Successes = true

producer, err := NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
Expand Down

0 comments on commit 353cc46

Please sign in to comment.