-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathpublisher.go
226 lines (182 loc) · 6.64 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package rabbitroutine
import (
"context"
"fmt"
"math"
"time"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
// ErrNotFound indicates that RabbitMQ entity doesn't exist.
ErrNotFound = errors.New("rabbitmq entity not found")
// ErrNoRoute indicates that queue is bound that matches the routing key.
// @see: https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17
ErrNoRoute = errors.New("queue not bound")
)
// Publisher interface provides functionality of publishing to RabbitMQ.
type Publisher interface {
// Publish used to send msg to RabbitMQ exchange.
Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error
}
// EnsurePublisher implements Publisher interface and guarantees delivery of the message to the server.
// When EnsurePublisher used, publishing confirmation is enabled, so we have delivery guarantees.
// @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
// @see https://www.rabbitmq.com/amqp-0-9-1-errata.html#section_17
type EnsurePublisher struct {
pool *Pool
}
// NewEnsurePublisher returns a new instance of EnsurePublisher.
func NewEnsurePublisher(p *Pool) *EnsurePublisher {
return &EnsurePublisher{pool: p}
}
// Publish sends msg to an exchange on the RabbitMQ and wait to ensure
// that msg have been successfully received by the server.
// Returns error if no queue is bound that matches the routing key.
// It will blocks until is either message is successfully delivered, context has cancelled or error received.
//
// While reconnecting is in process Publishing can't be finished, because amqp.Channel can't be received.
// Publisher doesn't know about the state of the connection, so for publisher reconniction is the same as "request took too long to be finished".
// "Too long" is defined by context.Context that is passed as first argument to Publish.
// If context has been cancelled, Publish returns context.DeadlineExceeded error.
// If connection was reestablished and Publish had enough time to be finished, then request would be finished successfully.
func (p *EnsurePublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error {
k, err := p.pool.ChannelWithConfirm(ctx)
if err != nil {
return errors.Wrap(err, "failed to receive channel for publishing")
}
err = p.publish(ctx, k, exchange, key, msg)
if err == ErrNoRoute {
p.pool.Release(k)
return err
}
if err != nil {
_ = k.Close() //nolint: gosec
return err
}
p.pool.Release(k)
return nil
}
func (p *EnsurePublisher) publish(ctx context.Context, k ChannelKeeper, exchange, key string, msg amqp.Publishing) error {
ch := k.Channel()
mandatory := true
immediate := false
err := ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
if err != nil {
return errors.Wrap(err, "failed to publish message")
}
select {
case <-ctx.Done():
return ctx.Err()
case amqpErr := <-k.Error():
if amqpErr.Code == amqp.NotFound {
return ErrNotFound
}
return errors.Wrap(amqpErr, "failed to deliver a message")
case amqpRet := <-k.Return():
if amqpRet.ReplyCode == amqp.NoRoute {
// Unroutable mandatory or immediate messages are acknowledged immediately after
// any Channel.NotifyReturn listeners have been notified.
<-k.Confirm()
return ErrNoRoute
}
return fmt.Errorf("failed to deliver a message: %s", amqpRet.ReplyText)
case <-k.Confirm():
return nil
}
}
// FireForgetPublisher implements Publisher interface and used to publish messages to RabbitMQ exchange without delivery guarantees.
// When FireForgetPublisher used, publishing confirmation is not enabled, so we haven't delivery guarantees.
// @see http://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/
type FireForgetPublisher struct {
pool *LightningPool
}
// NewFireForgetPublisher returns a new instance of FireForgetPublisher.
func NewFireForgetPublisher(p *LightningPool) *FireForgetPublisher {
return &FireForgetPublisher{p}
}
// Publish sends msg to an exchange on the RabbitMQ.
func (p *FireForgetPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error {
ch, err := p.pool.Channel(ctx)
if err != nil {
return errors.Wrap(err, "failed to receive channel for publishing")
}
mandatory := false
immediate := false
err = ch.PublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
if err != nil {
_ = ch.Close() //nolint: gosec
return errors.Wrap(err, "failed to publish message")
}
p.pool.Release(ch)
return nil
}
// RetryPublisherOption describes a functional option for configuring RetryPublisher.
type RetryPublisherOption func(*RetryPublisher)
// RetryDelayFunc returns how long to wait before retry.
type RetryDelayFunc func(attempt uint) time.Duration
// RetryPublisher retries to publish message before context done.
type RetryPublisher struct {
Publisher
// maxAttempts is limit of publish attempts.
maxAttempts uint
// delayFn returns how long to wait before next retry
delayFn RetryDelayFunc
}
// NewRetryPublisher returns a new instance of RetryPublisherOption.
func NewRetryPublisher(p Publisher, opts ...RetryPublisherOption) *RetryPublisher {
pub := &RetryPublisher{
Publisher: p,
maxAttempts: math.MaxUint32,
delayFn: ConstDelay(10 * time.Millisecond),
}
for _, option := range opts {
option(pub)
}
return pub
}
// Publish is used to send msg to RabbitMQ exchange.
// It will block until is either message is delivered or context has cancelled.
// Error returned only if context was done.
func (p *RetryPublisher) Publish(ctx context.Context, exchange, key string, msg amqp.Publishing) error {
var err error
for attempt := uint(1); attempt <= p.maxAttempts; attempt++ {
err = p.Publisher.Publish(ctx, exchange, key, msg)
if err != nil {
select {
case <-time.After(p.delayFn(attempt)):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
return err
}
// PublishDelaySetup sets function for publish delay time.Duration receiving.
func PublishDelaySetup(fn RetryDelayFunc) RetryPublisherOption {
return func(pub *RetryPublisher) {
pub.delayFn = fn
}
}
// PublishMaxAttemptsSetup sets limit of publish attempts.
func PublishMaxAttemptsSetup(maxAttempts uint) RetryPublisherOption {
return func(pub *RetryPublisher) {
pub.maxAttempts = maxAttempts
}
}
// ConstDelay returns constant delay value.
func ConstDelay(delay time.Duration) RetryDelayFunc {
fn := func(_ uint) time.Duration {
return delay
}
return fn
}
// LinearDelay returns delay value increases linearly depending on the current attempt.
func LinearDelay(delay time.Duration) RetryDelayFunc {
fn := func(attempt uint) time.Duration {
return time.Duration(attempt) * delay
}
return fn
}