-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathexchange.lisp
161 lines (144 loc) · 6.57 KB
/
exchange.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
(in-package :cl-bunny)
(defclass exchange ()
((channel :type channel
:initform nil
:initarg :channel
:reader exchange-channel)
(name :type string
:initarg :name
:reader exchange-name)
(type :type string
:initform "direct"
:initarg :type
:reader exchange-type)
(durable :initarg :durable
:reader exchange-durable-p)
(auto-delete :initarg :auto-delete
:reader exchange-auto-delete-p)
(internal :initarg :internal
:reader exchange-internal-p)
(arguments :initarg :arguments
:reader exchange-arguments)
;; events
(on-return :type function
:initform (make-instance 'bunny-event)
:accessor exchange-on-return)))
(defmethod exchange-type ((exchange string))
nil)
(defmethod exchange-name ((exchange string))
exchange)
(defmethod exchange-channel ((exchange string))
nil)
(defun exchange.declare (exchange &key (type "direct") (passive nil) (durable nil) (auto-delete nil) (internal nil) (nowait nil) (arguments nil) (channel *channel*))
(channel.send% channel
(make-instance 'amqp-method-exchange-declare
:exchange (exchange-name exchange)
:type type
:passive passive
:durable durable
:auto-delete auto-delete
:internal internal
:nowait nowait
:arguments arguments)
(or (get-registered-exchange channel exchange)
(register-exchange channel (make-instance 'exchange
:channel channel
:name (exchange-name exchange)
:type type
:durable durable
:auto-delete auto-delete
:internal internal
:arguments arguments)))))
(defun exchange.exists-p (name)
(ignore-some-conditions (amqp-error-not-found)
(with-channel ()
(exchange.declare name
:passive t))))
(defun exchange.default (&optional (channel *channel*))
(or
(get-registered-exchange channel "")
(register-exchange channel (make-instance 'exchange :name ""
:durable t
:channel channel))))
(defun exchange.topic (&optional exchange &rest args &key passive durable auto-delete internal arguments (channel *channel*))
(apply #'exchange.declare
(or exchange "amq.topic")
(append (list :type "topic")
(if exchange
args
(list :durable t)))))
(defun exchange.fanout (&optional exchange &rest args &key passive durable auto-delete internal arguments (channel *channel*))
(apply #'exchange.declare
(or exchange "amq.fanout")
(append (list :type "fanout")
(if exchange
args
(list :durable t)))))
(defun exchange.direct (&optional exchange &rest args &key passive durable auto-delete internal arguments (channel *channel*))
(apply #'exchange.declare
(or exchange "amq.direct")
(append (list :type "direct")
(if exchange
args
(list :durable t)))))
(defun exchange.headers (&optional exchange &rest args &key passive durable auto-delete internal arguments (channel *channel*))
(apply #'exchange.declare
(or exchange "amq.headers")
(append (list :type "headers")
(if exchange
args
(list :durable t)))))
(defun exchange.match ()
(exchange.declare "amq.match"
:type "headers"
:durable t))
(defun exchange.delete (exchange &key (if-unused nil) (nowait nil) (channel *channel*))
;; TODO: deregister exchange
(channel.send% channel
(make-instance 'amqp-method-exchange-delete
:exchange (exchange-name exchange)
:if-unused if-unused
:nowait nowait)
(deregister-exchange channel exchange)
exchange))
(defun exchange.bind (destination source &key (routing-key "") (nowait nil) (arguments nil) (channel *channel*))
(channel.send% channel
(make-instance 'amqp-method-exchange-bind
:destination (exchange-name destination)
:source (exchange-name source)
:routing-key routing-key
:nowait nowait
:arguments arguments)
destination))
(defun exchange.unbind (destination source &key (routing-key "") (nowait nil) (arguments nil) (channel *channel*))
(channel.send% channel
(make-instance 'amqp-method-exchange-unbind
:destination (exchange-name destination)
:source (exchange-name source)
:routing-key routing-key
:nowait nowait
:arguments arguments)
destination))
(defmethod channel.receive (channel (method amqp-method-basic-return))
(let* ((message (make-instance 'returned-message :channel channel
:reply-code (amqp-method-field-reply-code method)
:reply-text (amqp-method-field-reply-text method)
:exchange (amqp-method-field-exchange method)
:routing-key (amqp-method-field-routing-key method)
:body (amqp-method-content method)
:properties (amqp-method-content-properties method)))
(exchange (get-registered-exchange channel (message-exchange message)))
(ex-event (and exchange
(exchange-on-return exchange)))
(ch-event (channel-on-return channel))
(unhandled t))
(when (and ch-event
(not (emptyp (event-handlers-list (channel-on-return% channel)))))
(setf unhandled nil)
(event! ch-event message))
(when (and ex-event
(not (emptyp (event-handlers-list (exchange-on-return exchange)))))
(setf unhandled nil)
(event! ex-event message))
(when unhandled
(log:warn "Got unhandled returned message"))))