diff --git a/consumer_test.go b/consumer_test.go index 066894cb3..58bbbca48 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1,7 +1,9 @@ package sarama import ( - "fmt" + "log" + "os" + "os/signal" "sync" "testing" "time" @@ -243,54 +245,55 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) { seedBroker.Close() } +// This example shows how to use a consumer with a select statement +// dealing with the different channels. func ExampleConsumer_select() { master, err := NewConsumer([]string{"localhost:9092"}, nil) if err != nil { - panic(err) - } else { - fmt.Println("> master consumer ready") + log.Fatalln(err) } defer func() { if err := master.Close(); err != nil { - panic(err) + log.Fatalln(err) } }() consumer, err := master.ConsumePartition("my_topic", 0, 0) if err != nil { - panic(err) - } else { - fmt.Println("> consumer ready") + log.Fatalln(err) } defer func() { if err := consumer.Close(); err != nil { - panic(err) + log.Fatalln(err) } }() msgCount := 0 + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + consumerLoop: for { select { case err := <-consumer.Errors(): - panic(err) + log.Println(err) case <-consumer.Messages(): msgCount++ - case <-time.After(5 * time.Second): - fmt.Println("> timed out") + case <-signals: + log.Println("Received interrupt") break consumerLoop } } - fmt.Println("Got", msgCount, "messages.") + log.Println("Processed", msgCount, "messages.") } +// This example shows how to use a consumer with different goroutines +// to read from the Messages and Errors channels. func ExampleConsumer_goroutines() { master, err := NewConsumer([]string{"localhost:9092"}, nil) if err != nil { - panic(err) - } else { - fmt.Println("> master consumer ready") + log.Fatalln(err) } defer func() { if err := master.Close(); err != nil { @@ -300,15 +303,8 @@ func ExampleConsumer_goroutines() { consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest) if err != nil { - panic(err) - } else { - fmt.Println("> consumer ready") + log.Fatalln(err) } - defer func() { - if err := consumer.Close(); err != nil { - panic(err) - } - }() var ( wg sync.WaitGroup @@ -319,7 +315,7 @@ func ExampleConsumer_goroutines() { go func() { defer wg.Done() for message := range consumer.Messages() { - fmt.Printf("Consumed message with offset %d", message.Offset) + log.Printf("Consumed message with offset %d", message.Offset) msgCount++ } }() @@ -328,10 +324,17 @@ func ExampleConsumer_goroutines() { go func() { defer wg.Done() for err := range consumer.Errors() { - fmt.Println(err) + log.Println(err) } }() + // Wait for an interrupt signal to trigger the shutdown + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + <-signals + consumer.AsyncClose() + + // Wait for the Messages and Errors channel to be fully drained. wg.Wait() - fmt.Println("Got", msgCount, "messages.") + log.Println("Processed", msgCount, "messages.") } diff --git a/producer_test.go b/producer_test.go index c7bc1b2a4..12146b79d 100644 --- a/producer_test.go +++ b/producer_test.go @@ -545,7 +545,9 @@ ProducerLoop: } // This example shows how to use the producer with separate goroutines -// reading from the Successes and Errors channels. +// reading from the Successes and Errors channels. Note that in order +// for the Successes channel to be populated, you have to set +// config.Producer.AckSuccesses to true. func ExampleProducer_goroutines() { config := NewConfig() config.Producer.AckSuccesses = true @@ -598,23 +600,22 @@ ProducerLoop: log.Printf("Successfully produced: %d; errors: %d\n", successes, errors) } +// This example shows the basic usage pattern of the SyncProducer. func ExampleSyncProducer() { producer, err := NewSyncProducer([]string{"localhost:9092"}, nil) if err != nil { - panic(err) + log.Fatalln(err) } defer func() { if err := producer.Close(); err != nil { - panic(err) + log.Fatalln(err) } }() - for { - partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123")) - if err != nil { - panic(err) - } else { - log.Printf("> message sent to partition %d at offset %d\n", partition, offset) - } + partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123")) + if err != nil { + log.Printf("FAILED to send message: %s\n", err) + } else { + log.Printf("> message sent to partition %d at offset %d\n", partition, offset) } }