-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpatterns.js
381 lines (376 loc) · 13.8 KB
/
patterns.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
/**
* Mu.js - An elegant solution to creating Node.js microservices with RabbitMQ
*
* Created by Wes King on 9/22/15.
*
* MIT Licensed
*/
(function () {
/**
* Export our service
*
* @param namespace
* @returns {Patterns}
*/
module.exports = function (namespace) {
return new Patterns(namespace);
};
/**
* Import our dependencies
*
* Underscore.js
* Q.js
* The AMQP Callback API
*/
var amqp = require('amqplib/callback_api')
, Q = require('q')
, _ = require('underscore')
, PubSub = require('./PubSub')
, Routed = require('./Routed')
, RPC = require('./RPC')
, Topic = require('./Topic')
, WorkQueue = require('./WorkQueue');
/**
* Set our default settings
*
* @type {{host: string, namespace_divider: string}}
*/
var defaults = {
host: 'amqp://localhost',
namespace_divider: '.'
};
/**
* Main service pattern constructor
*
* @param namespace - {string} - A custom namespace for the current daemon you are building
* @param config - {object} - A list of configuration values for the application
* @constructor
*/
function Patterns (namespace, config) {
this.namespace(namespace)
.configuration(config);
this.patterns = [];
this.rpc_calls = [];
}
/**
* Sets the current namespace of the service being declared
*
* @param namespace - {string} - A custom namespace for the current daemon you are building
* @returns {Patterns}
*/
Patterns.prototype.namespace = function (namespace) {
this.namespace = namespace;
return this;
};
/**
* Builds out the configuration object for connecting to the RabbitMQ server
*
* @param config - {object} - A list of configuration values for the application
* @returns {Patterns}
*/
Patterns.prototype.configuration = function (config) {
this.config = _.defaults(config, defaults);
return this;
};
/**
* Builds a new worker queue on the current service
*
* @param queue - {string} - The namespace extension for the current work queue. Eg - it would end up being namespace.queue.
* @returns {Patterns}
*/
Patterns.prototype.workQueue = function (queue) {
var _this = this;
queue = this.getQueueString(queue);
return this.connect()
.then(function (connection) {
var wq = new WorkQueue(queue);
_this.patterns.push(wq);
return wq.consume(connection);
});
};
/**
* Builds a new subscription via the publish/subscribe pattern on the current service.
*
* @param exchange - {string} - The namespace extension for the current PubSub queue. Eg - it would end up being namespace.exchange.
* @returns {Patterns}
*/
Patterns.prototype.pubSub = function (exchange) {
var _this = this;
exchange = this.getExchangeString(exchange);
return this.connect()
.then(function (connection) {
var ps = new PubSub(exchange);
_this.patterns.push(ps);
return ps.consume(connection);
});
};
/**
* Builds a new routed exchange pattern on the current service
*
* @param exchange - {string} - The namespace extension for the current Routed queue.
* @param routes - {array} - A list of routes for the daemon being defined to consume on the current namespace
* @returns {Patterns}
*/
Patterns.prototype.routed = function (exchange, routes) {
var _this = this;
exchange = this.getExchangeString(exchange);
return this.connect()
.then(function (connection) {
var routed = new Routed(exchange, routes);
_this.patterns.push(routed);
return routed.consume(connection);
});
};
/**
* Builds a new topic based exchange on the current service.
*
* @param exchange - {string} - The namespace extension for the current Topic based queue.
* @param topics - {array} - A list of topics for the daemon being define to subscribe to on the current namespace.
* @returns {Patterns}
*/
Patterns.prototype.topic = function (exchange, topics) {
var _this = this;
exchange = this.getExchangeString(exchange);
return this.connect()
.then(function (connection) {
var topic = new Topic(exchange, topics);
_this.patterns.push(topic);
return topic.consume(connection);
});
};
/**
* Builds a new RPC queue on the current service. RPC allows services to return data to the client.
* @param queue - {string} - the namespace extension for the current RPC method. Would look like `namespace.rpc.queue`
* @returns {Patterns}
*/
Patterns.prototype.rpc = function (queue) {
var _this = this;
queue = this.getRpcString(queue);
return this.connect()
.then(function (connection) {
var rpc = new RPC(queue);
_this.rpc_calls.push(rpc);
return rpc.consume(connection);
});
};
/**
* Broadcast data to a particular queue
*
* @param queue - {string} - The namespaced extension of the queue you are attempting to broadcast to
* @param args - {*} - Takes an input consisting of arguments relevant to the RPC method being called.
* @returns {Promise}
*/
Patterns.prototype.broadcast = function (queue, args) {
queue = this.getQueueString(queue);
return this
.connect()
.then(createChannel)
.spread(function (channel, conn) {
var input = stringifyJson(args);
channel.assertQueue(queue, {durable: true});
channel.sendToQueue(queue, new Buffer(input), {persistent: true});
conn.close();
})
.done();
};
/**
* Publish data to a particular queue
*
* @param exchange - {string} - The namespaced extension of the PubSub you are attempting to publish to
* @param args - {*} - Takes an input consisting of the arguments being published to the queue.
*/
Patterns.prototype.publish = function (exchange, args) {
exchange = this.getExchangeString(exchange);
return this
.connect()
.then(createChannel)
.then(function (channel, conn) {
var input = stringifyJson(args);
channel.assertExchange(exchange, 'fanout', {durable: false});
channel.publish(exchange, '', new Buffer(input));
conn.close();
})
.done();
};
/**
* Calls and RPC method to a queue with the given method name along with the arguments to pass into the method
*
* @param rpc_string - {string} - The RPC method that you are attempting to call from the RabbitMQ service
* @param args - {*} - Takes an input consisting of the arguments being published to the queue.
* @returns {Promise}
*/
Patterns.prototype.callRpc = function (rpc_string, args) {
rpc_string = this.namespace + '.rpc.' + rpc_string;
return this
.rpcConnect(rpc_string, args)
.spread(this.createRpcChannel)
.spread(this.assertRpcQueue)
};
/**
* Asserts and RPC queue via chained spread arguments for consumption of an RPC method
*
* @param channel - {object} - The channel object being passed down the promise chain for consumption by RPC
* @param connection - {object} - The connection object being passed down the promise chain for consumption by RPC
* @param rpc_string - {string} - The namespace extended RPC method being called on the daemon
* @param args - {*} - Takes an input consisting of the arguments being published to the queue.
* @returns {Promise}
*/
Patterns.prototype.assertRpcQueue = function (channel, connection, rpc_string, args) {
var deferred = Q.defer()
, _this = this;
channel.assertQueue('', {exclusive: true}, function(err, q) {
if (err) {
deferred.reject(err);
}
var corr = generateUuid();
var input = stringifyJson(args);
channel.consume(q.queue, function(msg) {
if (msg.properties.correlationId == corr) {
deferred.resolve(msg);
connection.close();
}
}, {noAck: true});
channel.sendToQueue(
rpc_string,
new Buffer(input),
{ correlationId: corr, replyTo: q.queue }
);
});
return deferred.promise;
};
/**
* The basic connect method. Returns a promise that consumes the connection object for usage.
*
* @returns {Promise}
*/
Patterns.prototype.connect = function () {
var deferred = Q.defer()
, _this = this;
amqp.connect(this.config.host, function (err, connection) {
if (err) return deferred.reject(err);
deferred.resolve(connection);
});
return deferred.promise;
};
/**
* The basic channel method. Just a short abstraction of connect and createChannel to return the user a channel
* object.
*
* @returns {Promise}
*/
Patterns.prototype.channel = function () {
return this.connect()
.then(this.createChannel);
};
/**
* The basic method of connecting for calling an RPC method. We have a specialty method for connecting via RPC because
* we have to continue to chain the rpc_string, args, and connection objects down the promise chain using spread, so this
* method is a wrapper for that functionality.
*
* @returns {Promise}
*/
Patterns.prototype.rpcConnect = function (rpc_string, args) {
var deferred = Q.defer();
amqp.connect(this.config.host, function (err, connection) {
if (err) return deferred.reject(err);
deferred.resolve([connection, rpc_string, args]);
});
return deferred.promise;
};
/**
* Similiar to the .rpcConnect() method above, this method is a wrapper that continues to chain arguments down the
* promise chain using the spread method so that an RPC method can be consumed.
*
* @param connection - {object} - The connection object being passed down the promise chain for consumption by RPC
* @param rpc_string - {string} - The namespaced RPC method being passed down the promise chain for consumption by RPC
* @param args - {*} - Takes an input consisting of the arguments being published to the queue.
* @returns {Promise}
*/
Patterns.prototype.createRpcChannel = function (connection, rpc_string, args) {
var deferred = Q.defer();
connection.createChannel(function (err, channel) {
if (err) return deferred.reject(err);
deferred.resolve([channel, connection, rpc_string, args]);
});
return deferred.promise;
};
/**
* The basic method for creating a channel. Can be chained with the .connect() promise chain to allow for the channel
* and the connection to be consumed via the .spread() method in the promise chain.
*
* @param connection - {object} - The RabbitMQ connection object consumed from the previous .connect() method in promise chain
* @returns {Promise}
*/
Patterns.prototype.createChannel = function (connection) {
var deferred = Q.defer();
connection.createChannel(function (err, channel) {
if (err) return deferred.reject(err);
deferred.resolve([channel, connection]);
});
return deferred.promise;
};
/**
* Shorthand method for building out a namespaced queue string for a daemon
*
* @param queue - {string} - The designated queue name to be built into a fully namespaced string
* @returns {string}
*/
Patterns.prototype.getQueueString = function (queue) {
return this.namespace + this.config.namespace_divider + queue;
};
/**
* Shorthand method for building out a namespaced exchange string for a daemon
*
* @param exchange - {string} - the
* @returns {string}
*/
Patterns.prototype.getExchangeString = function (exchange) {
return this.namespace + this.config.namespace_divider + exchange;
};
/**
* Shorthand method for building out a named spaced RPC method string for a daemon
*
* @param rpc
* @returns {string}
*/
Patterns.prototype.getRpcString = function (rpc) {
return this.namespace + this.config.namespace_divider + 'rpc' + this.config.namespace_divider + rpc;
};
/**
* Convert a JSON string to JavaScript object
*
* @param string
* @returns {{}}
*/
function parseJson (string) {
try {
var json_obj = JSON.parse(string);
return json_obj;
} catch (e) {
return {};
}
}
/**
* Convert a JavaScript object to JSON string
*
* @param object
*/
function stringifyJson (object) {
try {
var json_string = JSON.stringify(object);
return json_string;
} catch (e) {
return JSON.stringify({});
}
}
/**
* Generate a random ID for correlation strings in RabbitMQ for RPC calls
*
* @returns {string}
*/
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
}());