-
Notifications
You must be signed in to change notification settings - Fork 1
/
example_test.go
149 lines (125 loc) · 2.92 KB
/
example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package xkafka
import (
"context"
)
func ExampleConsumer() {
handler := HandlerFunc(func(ctx context.Context, msg *Message) error {
// do something with the message
// acknowledge the message with success, skip or error
msg.AckSuccess()
return nil
})
ignoreError := func(err error) error {
// ignore error
return nil
}
consumer, err := NewConsumer("consumer-id", handler,
Concurrency(10), // default is 1. values > 1 enable async processing
Topics{"test"},
Brokers{"localhost:9092"},
// default behavior is to stop the consumer. this option allows customizing the error handling
ErrorHandler(ignoreError),
// custom configuration for the underlying kafka consumer
ConfigMap{
"auto.commit.interval.ms": 1000,
},
// default behavior is to commit messages automatically.
// this option triggers manual commit after each message is processed.
ManualCommit(true),
)
if err != nil {
panic(err)
}
consumer.Use(
// middleware to log messages
MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, msg *Message) error {
// log the message
return next.Handle(ctx, msg)
})
}),
)
if err := consumer.Run(context.Background()); err != nil {
panic(err)
}
consumer.Close()
}
func ExampleProducer() {
ctx, cancel := context.WithCancel(context.Background())
producer, err := NewProducer(
"producer-id",
Brokers{"localhost:9092"},
ConfigMap{
"socket.keepalive.enable": true,
},
)
if err != nil {
panic(err)
}
producer.Use(
// middleware to log messages
MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, msg *Message) error {
// log the message
return next.Handle(ctx, msg)
})
}),
)
go func() {
err := producer.Run(ctx)
if err != nil {
panic(err)
}
}()
msg := &Message{
Topic: "test",
Key: []byte("key"),
Value: []byte("value"),
}
if err := producer.Publish(ctx, msg); err != nil {
panic(err)
}
// cancel the context to stop the producer
cancel()
}
func ExampleProducer_AsyncPublish() {
ctx, cancel := context.WithCancel(context.Background())
// default callback function called after each message
// handled by the producer
callback := func(msg *Message) {
// do something with the message
}
producer, err := NewProducer(
"producer-id",
Brokers{"localhost:9092"},
ConfigMap{
"socket.keepalive.enable": true,
},
DeliveryCallback(callback),
)
if err != nil {
panic(err)
}
go func() {
err := producer.Run(ctx)
if err != nil {
panic(err)
}
}()
msg := &Message{
Topic: "test",
Key: []byte("key"),
Value: []byte("value"),
}
// each message can have its own callback function
// in addition to the default callback function
msg.AddCallback(func(m *Message) {
// do something with the message
})
err = producer.AsyncPublish(ctx, msg)
if err != nil {
panic(err)
}
// cancel the context to stop the producer
cancel()
}