forked from porthos-rpc/porthos-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroker.go
120 lines (90 loc) · 2.24 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package porthos
import (
"sync"
"time"
"github.com/porthos-rpc/porthos-go/log"
"github.com/streadway/amqp"
)
// Broker holds an implementation-specific connection.
type Broker struct {
config Config
connection *amqp.Connection
m sync.Mutex
url string
closed bool
reestablishs []chan bool
}
// Config to be used when creating a new connection.
type Config struct {
reconnectInterval time.Duration
}
// NewBroker creates a new instance of AMQP connection.
func NewBroker(amqpURL string) (*Broker, error) {
return NewBrokerConfig(amqpURL, Config{
reconnectInterval: 1 * time.Second,
})
}
// NewBrokerConfig returns an AMQP Connection.
func NewBrokerConfig(amqpURL string, config Config) (*Broker, error) {
conn, err := amqp.Dial(amqpURL)
if err != nil {
return nil, err
}
b := &Broker{
connection: conn,
url: amqpURL,
config: config,
}
go b.handleConnectionClose()
return b, nil
}
// Close the broker connection.
func (b *Broker) Close() {
b.m.Lock()
defer b.m.Unlock()
b.connection.Close()
}
// NotifyConnectionClose writes in the returned channel when the connection with the broker closes.
func (b *Broker) NotifyConnectionClose() <-chan error {
ch := make(chan error)
go func() {
ch <- <-b.connection.NotifyClose(make(chan *amqp.Error))
}()
return ch
}
// NotifyReestablish returns a channel to notify when the connection is restablished.
func (b *Broker) NotifyReestablish() <-chan bool {
receiver := make(chan bool)
b.reestablishs = append(b.reestablishs, receiver)
return receiver
}
func (b *Broker) openChannel() (*amqp.Channel, error) {
b.m.Lock()
defer b.m.Unlock()
return b.connection.Channel()
}
func (b *Broker) reestablish() error {
conn, err := amqp.Dial(b.url)
b.m.Lock()
defer b.m.Unlock()
b.connection = conn
return err
}
func (b *Broker) handleConnectionClose() {
for !b.closed {
<-b.NotifyConnectionClose()
for i := 0; !b.closed; i++ {
err := b.reestablish()
if err == nil {
log.Info("Connection reestablished")
for _, c := range b.reestablishs {
c <- true
}
break
} else {
log.Error("Error reestablishing connection, attempt %d. Retrying... [%s]", i, err)
time.Sleep(b.config.reconnectInterval)
}
}
}
}