forked from gusev-klara/xk6-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexchanges.go
161 lines (147 loc) · 3.75 KB
/
exchanges.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package amqp
import (
"fmt"
amqpDriver "github.com/rabbitmq/amqp091-go"
)
// Exchange defines a connection to publish/subscribe destinations.
type Exchange struct {
Version string
Connections *map[int]*amqpDriver.Connection
MaxConnID *int
}
// ExchangeOptions defines configuration settings for accessing an exchange.
type ExchangeOptions struct {
ConnectionURL string
}
// ExchangeDeclareOptions provides options when declaring (creating) an exchange.
type ExchangeDeclareOptions struct {
ConnectionID int
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqpDriver.Table
}
// ExchangeDeleteOptions provides options when deleting an exchange.
type ExchangeDeleteOptions struct {
ConnectionID int
}
// ExchangeBindOptions provides options when binding (subscribing) one exchange to another.
type ExchangeBindOptions struct {
ConnectionID int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}
// ExchangeUnbindOptions provides options when unbinding (unsubscribing) one exchange from another.
type ExchangeUnbindOptions struct {
ConnectionID int
DestinationExchangeName string
SourceExchangeName string
RoutingKey string
NoWait bool
Args amqpDriver.Table
}
// GetConn gets an initialised connection by ID, or returns the last initialised one if ID is 0
func (exchange *Exchange) GetConn(connID int) (*amqpDriver.Connection, error) {
if connID == 0 {
conn := (*exchange.Connections)[*exchange.MaxConnID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection not initialised")
}
return conn, nil
}
conn := (*exchange.Connections)[connID]
if conn == nil {
return &amqpDriver.Connection{}, fmt.Errorf("connection with ID %d not initialised", connID)
}
return conn, nil
}
// Declare creates a new exchange given the provided options.
func (exchange *Exchange) Declare(options ExchangeDeclareOptions) error {
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
defer func() {
_ = ch.Close()
}()
return ch.ExchangeDeclare(
options.Name,
options.Kind,
options.Durable,
options.AutoDelete,
options.Internal,
options.NoWait,
options.Args,
)
}
// Delete removes an exchange from the remote server given the exchange name.
func (exchange *Exchange) Delete(name string, options ExchangeDeleteOptions) error {
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
defer func() {
_ = ch.Close()
}()
return ch.ExchangeDelete(
name,
false, // ifUnused
false, // noWait
)
}
// Bind subscribes one exchange to another.
func (exchange *Exchange) Bind(options ExchangeBindOptions) error {
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
defer func() {
_ = ch.Close()
}()
return ch.ExchangeBind(
options.DestinationExchangeName,
options.RoutingKey,
options.SourceExchangeName,
options.NoWait,
options.Args,
)
}
// Unbind removes a subscription from one exchange to another.
func (exchange *Exchange) Unbind(options ExchangeUnbindOptions) error {
conn, err := exchange.GetConn(options.ConnectionID)
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
return err
}
defer func() {
_ = ch.Close()
}()
return ch.ExchangeUnbind(
options.DestinationExchangeName,
options.RoutingKey,
options.SourceExchangeName,
options.NoWait,
options.Args,
)
}