-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathconnector.go
360 lines (294 loc) · 8.79 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
package rabbitroutine
import (
"context"
"math"
"sync"
"time"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
)
const (
defaultHeartbeat = 10 * time.Second
defaultLocale = "en_US"
)
// Config stores reconnect options.
type Config struct {
// ReconnectAttempts is a number that defines how many reconnect attempts would be made after the connection was broke off.
// After a new connection have been established this number is reset.
// So, when a next broke off happens there will be not less than ReconnectAttempts attempts to reconnect.
// In case of maximum reconnect attempts exceeded* Dial or DialConfig func will just return error and that's it.
// It's your turn to handle this situation.
// But in generall it's better have unlimited ReconnectAttemts and log errors using Connector.AddRetriedListener (see examples dir)
ReconnectAttempts uint
// How long to wait between reconnect attempts.
Wait time.Duration
}
// Connector implement RabbitMQ failover.
type Connector struct {
cfg Config
conn *amqp.Connection
connCh chan *amqp.Connection
// retries has list of Retried event handlers.
retries []func(Retried)
// dials has list of Dialed event handlers.
dials []func(Dialed)
// amqpnotifies has list of AMQPNotified event handlers.
amqpnotifies []func(AMQPNotified)
}
// NewConnector return a new instance of Connector.
func NewConnector(cfg Config) *Connector {
if cfg.ReconnectAttempts == 0 {
cfg.ReconnectAttempts = math.MaxUint32
}
return &Connector{
cfg: cfg,
connCh: make(chan *amqp.Connection),
}
}
// StartMultipleConsumers is used to start Consumer "count" times.
// Method Declare will be called once, and Consume will be called "count" times (one goroutine per call)
// so you can scale consumer horizontally.
// It's blocking method.
//
// NOTE: It's blocking method.
// nolint: gocyclo
func (c *Connector) StartMultipleConsumers(ctx context.Context, consumer Consumer, count int) error {
var lastErr error
for {
if contextDone(ctx) {
return lastErr
}
// On error wait for c.cfg.Wait time before consumer restart
if lastErr != nil {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.cfg.Wait):
}
}
// Use declareChannel only for consumer.Declare,
// and close it after successful declaring.
// nolint: vetshadow
declareChannel, err := c.Channel(ctx)
if err != nil {
lastErr = errors.WithMessage(err, "failed to get channel")
continue
}
err = consumer.Declare(ctx, declareChannel)
if err != nil {
lastErr = errors.WithMessage(err, "failed to declare consumer")
continue
}
err = declareChannel.Close()
if err != nil {
lastErr = errors.WithMessage(err, "failed to close declareChannel")
continue
}
var g errgroup.Group
consumeCtx, cancel := context.WithCancel(ctx)
for i := 0; i < count; i++ {
// Allocate new channel for each consumer.
// nolint: vetshadow
consumeChannel, err := c.Channel(consumeCtx)
if err != nil {
// If we got error then stop all previously started consumers
// and wait before they will be finished.
cancel()
break
}
var once sync.Once
closeCh := consumeChannel.NotifyClose(make(chan *amqp.Error, 1))
// Start two goroutine: one for consuming and second for close notification receiving.
// When close notification received via closeCh, then all consumers get notification via consumeCtx.
// In this case consuming must be finished and then goroutine will finish their work.
g.Go(func() error {
// On consume exit send stop signal to all consumer's goroutines.
defer cancel()
// nolint: vetshadow
err := consumer.Consume(consumeCtx, consumeChannel)
if err != nil {
return errors.Wrap(err, "failed to consume")
}
var closeErr error
once.Do(func() {
closeErr = consumeChannel.Close()
})
if closeErr != nil && closeErr != amqp.ErrClosed {
return errors.Wrap(closeErr, "failed to close amqp channel")
}
return nil
})
g.Go(func() error {
// On amqp error send stop signal to all consumer's goroutines.
defer cancel()
var stopErr error
select {
case <-consumeCtx.Done():
stopErr = consumeCtx.Err()
case amqpErr := <-closeCh:
c.emitAMQPNotified(AMQPNotified{amqpErr})
stopErr = amqpErr
}
var closeErr error
once.Do(func() {
closeErr = consumeChannel.Close()
})
if closeErr != nil && closeErr != amqp.ErrClosed {
return errors.Wrap(closeErr, "failed to close amqp channel")
}
return stopErr
})
}
lastErr = g.Wait()
cancel()
}
}
// StartConsumer is used to start Consumer.
//
// NOTE: It's blocking method.
func (c *Connector) StartConsumer(ctx context.Context, consumer Consumer) error {
return c.StartMultipleConsumers(ctx, consumer, 1)
}
// Channel allocate and return new amqp.Channel.
// On error new Channel should be opened.
//
// NOTE: It's blocking method. (It's waiting before connection will be established)
func (c *Connector) Channel(ctx context.Context) (*amqp.Channel, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case conn := <-c.connCh:
return conn.Channel()
}
}
// AddRetriedListener registers a event listener of
// connection establishing attempts.
//
// NOTE: not concurrency-safe.
func (c *Connector) AddRetriedListener(h func(Retried)) {
c.retries = append(c.retries, h)
}
// emitRetry notify listeners about connection retry event.
func (c *Connector) emitRetried(r Retried) {
for _, h := range c.retries {
h(r)
}
}
// AddDialedListener registers a event listener of
// connection successfully established.
//
// NOTE: not concurrency-safe.
func (c *Connector) AddDialedListener(h func(r Dialed)) {
c.dials = append(c.dials, h)
}
// emitDialed notify listeners about dial event.
func (c *Connector) emitDialed(d Dialed) {
for _, h := range c.dials {
h(d)
}
}
// AddAMQPNotifiedListener registers a event listener of
// AMQP error receiving.
//
// NOTE: not concurrency-safe.
func (c *Connector) AddAMQPNotifiedListener(h func(n AMQPNotified)) {
c.amqpnotifies = append(c.amqpnotifies, h)
}
// emitAMQPNotified notify listeners about AMQPNotified event.
func (c *Connector) emitAMQPNotified(n AMQPNotified) {
for _, h := range c.amqpnotifies {
h(n)
}
}
// dialWithIt try to connect to RabbitMQ.
// On error it will reconnect.
// If maximum retry count exceeded error will return.
func (c *Connector) dialWithIt(ctx context.Context, url string, config amqp.Config) error {
var err error
for i := uint(1); i <= c.cfg.ReconnectAttempts; i++ {
c.conn, err = amqp.DialConfig(url, config)
if err != nil {
c.emitRetried(Retried{i, err})
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(c.cfg.Wait):
continue
}
}
return nil
}
return errors.WithMessage(err, "maximum attempts exceeded")
}
// connBroadcast is used to send available connection to connCh.
func (c *Connector) connBroadcast(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case c.connCh <- c.conn:
}
}
}
// Dial will try to keep RabbitMQ connection active
// by catching and handling connection errors.
// It will return any error only if ctx was done.
//
// NOTE: It's blocking method.
func (c *Connector) Dial(ctx context.Context, url string) error {
return c.DialConfig(ctx, url, amqp.Config{
Heartbeat: defaultHeartbeat,
Locale: defaultLocale,
})
}
// DialConfig used to configure RabbitMQ connection with amqp.Config.
// It will try to keep RabbitMQ connection active
// by catching and handling connection errors.
// It will return any error only if ctx was done.
//
// NOTE: It's blocking method.
func (c *Connector) DialConfig(ctx context.Context, url string, config amqp.Config) error {
for {
err := c.dialWithIt(ctx, url, config)
if err != nil {
return errors.WithMessage(err, "failed to dial")
}
// In the case of connection problems,
// we will get an error from closeCh
closeCh := c.conn.NotifyClose(make(chan *amqp.Error, 1))
// After context cancellation we must wait for finishing of connBroadcast to avoid data race on c.conn.
var wg sync.WaitGroup
wg.Add(1)
broadcastCtx, cancel := context.WithCancel(ctx)
go func() {
c.connBroadcast(broadcastCtx)
wg.Done()
}()
c.emitDialed(Dialed{})
select {
case <-ctx.Done():
cancel()
wg.Wait()
err = c.conn.Close()
// It's not error if connection has already been closed.
if err != nil && err != amqp.ErrClosed {
return errors.WithMessage(err, "failed to close rabbitmq connection")
}
return ctx.Err()
case amqpErr := <-closeCh:
cancel()
wg.Wait()
c.emitAMQPNotified(AMQPNotified{amqpErr})
}
}
}
// contextDone used to check if ctx.Done() channel was closed.
func contextDone(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
}
return false
}