forked from optiopay/kafka
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexample_test.go
133 lines (114 loc) · 2.86 KB
/
example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package kafka
import (
"fmt"
"github.com/optiopay/kafka/proto"
)
func ExampleConsumer() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create new consumer
conf := NewConsumerConf("my-messages", 0)
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
panic(err)
}
// read all messages
for {
msg, err := consumer.Consume()
if err != nil {
if err == ErrNoData {
break
}
panic(err)
}
fmt.Printf("message: %#v", msg)
}
}
func ExampleOffsetCoordinator() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create offset coordinator and customize configuration
conf := NewOffsetCoordinatorConf("my-consumer-group")
conf.RetryErrLimit = 20
coordinator, err := broker.OffsetCoordinator(conf)
if err != nil {
panic(err)
}
// write consumed message offset for topic/partition
if err := coordinator.Commit("my-topic", 0, 12); err != nil {
panic(err)
}
// get latest consumed offset for given topic/partition
off, _, err := coordinator.Offset("my-topic", 0)
if err != nil {
panic(err)
}
if off != 12 {
panic(fmt.Sprintf("offset is %d, not 12", off))
}
}
func ExampleProducer() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
// create new producer
conf := NewProducerConf()
conf.RequiredAcks = proto.RequiredAcksLocal
// write two messages to kafka using single call to make it atomic
producer := broker.Producer(conf)
messages := []*proto.Message{
{Value: []byte("first")},
{Value: []byte("second")},
}
if _, err := producer.Produce("my-messages", 0, messages...); err != nil {
panic(err)
}
}
func ExampleMerge() {
// connect to kafka cluster
addresses := []string{"localhost:9092", "localhost:9093"}
broker, err := Dial(addresses, NewBrokerConf("test"))
if err != nil {
panic(err)
}
defer broker.Close()
topics := []string{"fruits", "vegetables"}
fetchers := make([]Consumer, len(topics))
// create consumers for different topics
for i, topic := range topics {
conf := NewConsumerConf(topic, 0)
conf.RetryLimit = 20
conf.StartOffset = StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
panic(err)
}
fetchers[i] = consumer
}
// merge all created consumers (they don't even have to belong to the same broker!)
mx := Merge(fetchers...)
defer mx.Close()
// consume messages from all sources
for {
msg, err := mx.Consume()
if err != nil {
panic(err)
}
fmt.Printf("message: %#v", msg)
}
}