Skip to content

Commit

Permalink
Merge pull request #327 from Shopify/godoc_examples
Browse files Browse the repository at this point in the history
Clean up and document examples
  • Loading branch information
wvanbergen committed Mar 10, 2015
2 parents 14bae79 + 4a62390 commit 7cbd3ea
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 37 deletions.
57 changes: 30 additions & 27 deletions consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package sarama

import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -243,54 +245,55 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
seedBroker.Close()
}

// This example shows how to use a consumer with a select statement
// dealing with the different channels.
func ExampleConsumer_select() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
} else {
fmt.Println("> master consumer ready")
log.Fatalln(err)
}
defer func() {
if err := master.Close(); err != nil {
panic(err)
log.Fatalln(err)
}
}()

consumer, err := master.ConsumePartition("my_topic", 0, 0)
if err != nil {
panic(err)
} else {
fmt.Println("> consumer ready")
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
log.Fatalln(err)
}
}()

msgCount := 0

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumerLoop:
for {
select {
case err := <-consumer.Errors():
panic(err)
log.Println(err)
case <-consumer.Messages():
msgCount++
case <-time.After(5 * time.Second):
fmt.Println("> timed out")
case <-signals:
log.Println("Received interrupt")
break consumerLoop
}
}
fmt.Println("Got", msgCount, "messages.")
log.Println("Processed", msgCount, "messages.")
}

// This example shows how to use a consumer with different goroutines
// to read from the Messages and Errors channels.
func ExampleConsumer_goroutines() {
master, err := NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
} else {
fmt.Println("> master consumer ready")
log.Fatalln(err)
}
defer func() {
if err := master.Close(); err != nil {
Expand All @@ -300,15 +303,8 @@ func ExampleConsumer_goroutines() {

consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
if err != nil {
panic(err)
} else {
fmt.Println("> consumer ready")
log.Fatalln(err)
}
defer func() {
if err := consumer.Close(); err != nil {
panic(err)
}
}()

var (
wg sync.WaitGroup
Expand All @@ -319,7 +315,7 @@ func ExampleConsumer_goroutines() {
go func() {
defer wg.Done()
for message := range consumer.Messages() {
fmt.Printf("Consumed message with offset %d", message.Offset)
log.Printf("Consumed message with offset %d", message.Offset)
msgCount++
}
}()
Expand All @@ -328,10 +324,17 @@ func ExampleConsumer_goroutines() {
go func() {
defer wg.Done()
for err := range consumer.Errors() {
fmt.Println(err)
log.Println(err)
}
}()

// Wait for an interrupt signal to trigger the shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
<-signals
consumer.AsyncClose()

// Wait for the Messages and Errors channel to be fully drained.
wg.Wait()
fmt.Println("Got", msgCount, "messages.")
log.Println("Processed", msgCount, "messages.")
}
21 changes: 11 additions & 10 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,9 @@ ProducerLoop:
}

// This example shows how to use the producer with separate goroutines
// reading from the Successes and Errors channels.
// reading from the Successes and Errors channels. Note that in order
// for the Successes channel to be populated, you have to set
// config.Producer.AckSuccesses to true.
func ExampleProducer_goroutines() {
config := NewConfig()
config.Producer.AckSuccesses = true
Expand Down Expand Up @@ -598,23 +600,22 @@ ProducerLoop:
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}

// This example shows the basic usage pattern of the SyncProducer.
func ExampleSyncProducer() {
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
log.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
panic(err)
log.Fatalln(err)
}
}()

for {
partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
if err != nil {
panic(err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
}

0 comments on commit 7cbd3ea

Please sign in to comment.