-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmakeDelegator.js
63 lines (51 loc) · 1.64 KB
/
makeDelegator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const amqp = require("amqplib");
const emitter = require("events");
const { v4 } = require("uuid");
const defaults = require("./defaults.js");
const { DELEGATOR_ALREADY_STARTED } = require("./errors.js");
const invoker = require("./utils/invoker.js");
const makeDelegator = (options = {}) => {
const {
expires = 300000,
namespace = "amqp",
onClose,
onError,
replyTo = v4(),
url,
} = { ...defaults, ...options };
let connection;
let channel;
const start = async () => {
if (channel) throw new Error(DELEGATOR_ALREADY_STARTED);
connection = await amqp.connect(url);
if (typeof onClose === "function") connection.on("close", onClose);
if (typeof onError === "function") connection.on("error", onError);
channel = await connection.createChannel();
await channel.assertQueue(replyTo, { expires });
channel.responseEmitter = new emitter();
channel.responseEmitter.setMaxListeners(0);
channel.consume(
replyTo,
(message) => {
channel.responseEmitter.emit(
message.properties.correlationId,
JSON.parse(message.content.toString())
);
},
{ noAck: true }
);
};
const invoke = async (name, args, options) => {
if (!channel) throw new Error(DELEGATOR_NOT_STARTED);
const invocation = invoker(channel, replyTo, v4());
return await invocation(`${namespace}.${name}`, args, options);
};
const stop = async () => {
if (channel) await channel.close();
channel = undefined;
if (connection) await connection.close();
connection = undefined;
};
return { invoke, start, stop };
};
module.exports = makeDelegator;