From 6d3f18d53311bba9e04681b14536a459b1ca6a4e Mon Sep 17 00:00:00 2001 From: Willem van Bergen Date: Mon, 9 Mar 2015 19:12:44 -0400 Subject: [PATCH] Add an example for using the async producer with goroutines for the successes and errors channel. --- producer_test.go | 82 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 76 insertions(+), 6 deletions(-) diff --git a/producer_test.go b/producer_test.go index e22efb8a3..c7bc1b2a4 100644 --- a/producer_test.go +++ b/producer_test.go @@ -1,7 +1,9 @@ package sarama import ( - "fmt" + "log" + "os" + "os/signal" "sync" "testing" ) @@ -507,25 +509,93 @@ func TestProducerMultipleRetries(t *testing.T) { closeProducer(t, producer) } -func ExampleProducer() { +// This example shows how to use the producer while simultaneously +// reading the Errors channel to know about any failures. +func ExampleProducer_select() { producer, err := NewProducer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } + defer func() { if err := producer.Close(); err != nil { - panic(err) + log.Fatalln(err) } }() + // Trap SIGINT to trigger a shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + var enqueued, errors int +ProducerLoop: for { select { case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}: - fmt.Println("> message queued") + enqueued++ case err := <-producer.Errors(): - panic(err.Err) + log.Println("Failed to produce message", err) + errors++ + case <-signals: + break ProducerLoop + } + } + + log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors) +} + +// This example shows how to use the producer with separate goroutines +// reading from the Successes and Errors channels. +func ExampleProducer_goroutines() { + config := NewConfig() + config.Producer.AckSuccesses = true + producer, err := NewProducer([]string{"localhost:9092"}, config) + if err != nil { + panic(err) + } + + // Trap SIGINT to trigger a graceful shutdown. + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + + var ( + wg sync.WaitGroup + enqueued, successes, errors int + ) + + wg.Add(1) + go func() { + defer wg.Done() + for _ = range producer.Successes() { + successes++ + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for err := range producer.Errors() { + log.Println(err) + errors++ + } + }() + +ProducerLoop: + for { + message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")} + select { + case producer.Input() <- message: + enqueued++ + + case <-signals: + producer.AsyncClose() // Trigger a shutdown of the producer. + break ProducerLoop } } + + wg.Wait() + + log.Printf("Successfully produced: %d; errors: %d\n", successes, errors) } func ExampleSyncProducer() { @@ -544,7 +614,7 @@ func ExampleSyncProducer() { if err != nil { panic(err) } else { - fmt.Printf("> message sent to partition %d at offset %d\n", partition, offset) + log.Printf("> message sent to partition %d at offset %d\n", partition, offset) } } }