From 124118854ef27e8e7e932d4921e5673d8591fd4f Mon Sep 17 00:00:00 2001 From: Matt Ross Date: Fri, 2 Jul 2021 10:15:51 -0400 Subject: [PATCH] perf: Upgrade reasonml to rescript --- examples/pubsub_publisher.re | 64 --------------- examples/pubsub_publisher.res | 53 +++++++++++++ examples/pubsub_subscriber.re | 63 --------------- examples/pubsub_subscriber.res | 51 ++++++++++++ examples/receiver.re | 49 ------------ examples/receiver.res | 38 +++++++++ examples/sender.re | 69 ----------------- examples/sender.res | 55 +++++++++++++ package-lock.json | 12 +-- package.json | 9 ++- src/AmqpConnectionManager.re | 137 --------------------------------- src/AmqpConnectionManager.rei | 112 --------------------------- src/AmqpConnectionManager.res | 124 +++++++++++++++++++++++++++++ src/AmqpConnectionManager.resi | 104 +++++++++++++++++++++++++ 14 files changed, 436 insertions(+), 504 deletions(-) delete mode 100644 examples/pubsub_publisher.re create mode 100644 examples/pubsub_publisher.res delete mode 100644 examples/pubsub_subscriber.re create mode 100644 examples/pubsub_subscriber.res delete mode 100644 examples/receiver.re create mode 100644 examples/receiver.res delete mode 100644 examples/sender.re create mode 100644 examples/sender.res delete mode 100644 src/AmqpConnectionManager.re delete mode 100644 src/AmqpConnectionManager.rei create mode 100644 src/AmqpConnectionManager.res create mode 100644 src/AmqpConnectionManager.resi diff --git a/examples/pubsub_publisher.re b/examples/pubsub_publisher.re deleted file mode 100644 index 0bed663..0000000 --- a/examples/pubsub_publisher.re +++ /dev/null @@ -1,64 +0,0 @@ -/* this is pretty much the same as the examples provided in - * node-amqp-connection-manager - * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-publisher.js */ - -module Amqp = AmqpConnectionManager; -[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout"; - -let exchange_name = "amqp-connection-manager-sample2-ex"; - -// Create a connetion manager -let connection = Amqp.connect([|"amqp://localhost"|], ()); -Amqp.AmqpConnectionManager.on( - connection, - `connect(_ => Js.Console.info("Connected!")), -); -Amqp.AmqpConnectionManager.on( - connection, - `disconnect(err => Js.Console.error(err)), -); - -// Create a channel wrapper -let channelWrapper = - Amqp.AmqpConnectionManager.createChannel( - connection, - { - "json": true, - "setup": channel => - Amqp.Channel.assertExchange( - channel, - exchange_name, - "topic", - Js.Obj.empty(), - ) - |> Js.Promise.then_(_ => Js.Promise.resolve()), - }, - ); - -// Send messages until someone hits CTRL-C or something goes wrong... -let rec sendMessage = () => { - Amqp.ChannelWrapper.publish( - channelWrapper, - exchange_name, - "", - {"time": Js.Date.now()}, - {"contentType": "application/json", "persistent": true}, - ) - |> Js.Promise.then_(msg => { - Js.Console.info("Message sent"); - Js.Promise.make((~resolve, ~reject as _) => - setTimeout(() => resolve(. msg), 1000) |> ignore - ); - }) - |> Js.Promise.then_(_ => sendMessage()) - |> Js.Promise.catch(err => { - Js.Console.error(err); - Amqp.ChannelWrapper.close(channelWrapper); - Amqp.AmqpConnectionManager.close(connection); - - Js.Promise.resolve(); - }); -}; - -Js.Console.info("Sending messages..."); -sendMessage(); diff --git a/examples/pubsub_publisher.res b/examples/pubsub_publisher.res new file mode 100644 index 0000000..b2a53b7 --- /dev/null +++ b/examples/pubsub_publisher.res @@ -0,0 +1,53 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-publisher.js */ + +module Amqp = AmqpConnectionManager +@val external setTimeout: (unit => unit, int) => int = "setTimeout" + +let exchange_name = "amqp-connection-manager-sample2-ex" + +// Create a connetion manager +let connection = Amqp.connect(["amqp://localhost"], ()) +Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("Connected!")))->ignore +Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err)))->ignore + +// Create a channel wrapper +let channelWrapper = Amqp.AmqpConnectionManager.createChannel( + connection, + { + "json": true, + "setup": channel => + Amqp.Channel.assertExchange( + channel, + exchange_name, + "topic", + Js.Obj.empty(), + ) |> Js.Promise.then_(_ => Js.Promise.resolve()), + }, +) + +// Send messages until someone hits CTRL-C or something goes wrong... +let rec sendMessage = () => + Amqp.ChannelWrapper.publish( + channelWrapper, + exchange_name, + "", + {"time": Js.Date.now()}, + {"contentType": "application/json", "persistent": true}, + ) + |> Js.Promise.then_(msg => { + Js.Console.info("Message sent") + Js.Promise.make((~resolve, ~reject as _) => setTimeout(() => resolve(. msg), 1000) |> ignore) + }) + |> Js.Promise.then_(_ => sendMessage()) + |> Js.Promise.catch(err => { + Js.Console.error(err) + Amqp.ChannelWrapper.close(channelWrapper) + Amqp.AmqpConnectionManager.close(connection) + + Js.Promise.resolve() + }) + +Js.Console.info("Sending messages...") +sendMessage()->ignore diff --git a/examples/pubsub_subscriber.re b/examples/pubsub_subscriber.re deleted file mode 100644 index c1bf066..0000000 --- a/examples/pubsub_subscriber.re +++ /dev/null @@ -1,63 +0,0 @@ -/* this is pretty much the same as the examples provided in - * node-amqp-connection-manager - * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-subscriber.js */ - -module Amqp = AmqpConnectionManager; - -let queue_name = "amqp-connection-manager-sample2"; -let exchange_name = "amqp-connection-manager-sample2-ex"; - -// Handle an incomming message. -let onMessage = (channel, msg: Amqp.Queue.message) => { - let message = msg.content->Node.Buffer.toString->Js.Json.parseExn; - Js.Console.log2("receiver: got message", message); - Amqp.Channel.ack(channel, msg); -}; - -// Create a connetion manager -let connection = Amqp.connect([|"amqp://localhost"|], ()); -Amqp.AmqpConnectionManager.on( - connection, - `connect(_ => Js.Console.info("Connected!")), -); -Amqp.AmqpConnectionManager.on( - connection, - `disconnect(err => Js.Console.error(err)), -); - -// Set up a channel listening for messages in the queue. -let channelWrapper = - Amqp.AmqpConnectionManager.createChannel( - connection, - { - "setup": channel => - // `channel` here is a regular amqplib `ConfirmChannel`. - Js.Promise.( - all([| - Amqp.Channel.assertQueue( - channel, - queue_name, - {"exclusive": true, "autoDelete": true, "durable": false}, - ) - |> then_(_ => resolve()), - Amqp.Channel.assertExchange( - channel, - exchange_name, - "topic", - Js.Obj.empty(), - ) - |> then_(_ => resolve()), - Amqp.Channel.prefetch(channel, 1), - Amqp.Channel.bindQueue(channel, queue_name, exchange_name, ""), - Amqp.Channel.consume(channel, queue_name, onMessage(channel)), - |]) - |> then_(_ => resolve()) - ), - }, - ); - -Amqp.ChannelWrapper.waitForConnect(channelWrapper) -|> Js.Promise.then_(_ => { - Js.Console.info("Listening for messages"); - Js.Promise.resolve(); - }); diff --git a/examples/pubsub_subscriber.res b/examples/pubsub_subscriber.res new file mode 100644 index 0000000..1232960 --- /dev/null +++ b/examples/pubsub_subscriber.res @@ -0,0 +1,51 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-subscriber.js */ + +module Amqp = AmqpConnectionManager + +let queue_name = "amqp-connection-manager-sample2" +let exchange_name = "amqp-connection-manager-sample2-ex" + +// Handle an incomming message. +let onMessage = (channel, msg: Amqp.Queue.message) => { + let message = msg.content->Node.Buffer.toString->Js.Json.parseExn + Js.Console.log2("receiver: got message", message) + Amqp.Channel.ack(channel, msg) +} + +// Create a connetion manager +let connection = Amqp.connect(["amqp://localhost"], ()) +Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("Connected!")))->ignore +Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err)))->ignore + +// Set up a channel listening for messages in the queue. +let channelWrapper = Amqp.AmqpConnectionManager.createChannel( + connection, + { + "setup": channel => { + open // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise + all([ + Amqp.Channel.assertQueue( + channel, + queue_name, + {"exclusive": true, "autoDelete": true, "durable": false}, + ) |> then_(_ => resolve()), + Amqp.Channel.assertExchange(channel, exchange_name, "topic", Js.Obj.empty()) |> then_(_ => + resolve() + ), + Amqp.Channel.prefetch(channel, 1), + Amqp.Channel.bindQueue(channel, queue_name, exchange_name, ""), + Amqp.Channel.consume(channel, queue_name, onMessage(channel)), + ]) |> then_(_ => resolve()) + }, + }, +) + +Amqp.ChannelWrapper.waitForConnect(channelWrapper) +|> Js.Promise.then_(_ => { + Js.Console.info("Listening for messages") + Js.Promise.resolve() +}) +|> ignore diff --git a/examples/receiver.re b/examples/receiver.re deleted file mode 100644 index 33fb9d9..0000000 --- a/examples/receiver.re +++ /dev/null @@ -1,49 +0,0 @@ -/* this is pretty much the same as the examples provided in - * node-amqp-connection-manager - * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/receiver.js - * */ - -module Amqp = AmqpConnectionManager; - -let queue_name = "amqp-connection-manager-sample1"; - -// Handle an incomming message. -let onMessage = (channel, msg: Amqp.Queue.message) => { - let message = msg.content->Node.Buffer.toString->Js.Json.parseExn; - Js.Console.log2("receiver: got message", message); - Amqp.Channel.ack(channel, msg); -}; - -// Create a connetion manager -let connection = Amqp.connect([|"amqp://localhost"|], ()); - -Amqp.AmqpConnectionManager.on( - connection, - `disconnect(err => Js.Console.error(err)), -) -|> ignore; - -Amqp.AmqpConnectionManager.on( - connection, - `connect(_ => Js.Console.info("connected!")), -) -|> ignore; - -// Set up a channel listening for messages in the queue. -let channelWrapper = - Amqp.AmqpConnectionManager.createChannel( - connection, - { - "setup": channel => - // `channel` here is a regular amqplib `ConfirmChannel`. - Js.Promise.( - all([| - Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) - |> then_(_ => resolve()), - Amqp.Channel.prefetch(channel, 1), - Amqp.Channel.consume(channel, queue_name, onMessage(channel)), - |]) - |> then_(_ => resolve()) - ), - }, - ); diff --git a/examples/receiver.res b/examples/receiver.res new file mode 100644 index 0000000..cbc58fc --- /dev/null +++ b/examples/receiver.res @@ -0,0 +1,38 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/receiver.js + * */ + +module Amqp = AmqpConnectionManager + +let queue_name = "amqp-connection-manager-sample1" + +// Handle an incomming message. +let onMessage = (channel, msg: Amqp.Queue.message) => { + let message = msg.content->Node.Buffer.toString->Js.Json.parseExn + Js.Console.log2("receiver: got message", message) + Amqp.Channel.ack(channel, msg) +} + +// Create a connetion manager +let connection = Amqp.connect(["amqp://localhost"], ()) + +Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err))) |> ignore + +Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("connected!"))) |> ignore + +// Set up a channel listening for messages in the queue. +let channelWrapper = Amqp.AmqpConnectionManager.createChannel( + connection, + { + "setup": channel => { + open // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise + all([ + Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) |> then_(_ => resolve()), + Amqp.Channel.prefetch(channel, 1), + Amqp.Channel.consume(channel, queue_name, onMessage(channel)), + ]) |> then_(_ => resolve()) + }, + }, +) diff --git a/examples/sender.re b/examples/sender.re deleted file mode 100644 index 1e04a0a..0000000 --- a/examples/sender.re +++ /dev/null @@ -1,69 +0,0 @@ -/* this is pretty much the same as the examples provided in - * node-amqp-connection-manager - * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/sender.js - * */ - -module Amqp = AmqpConnectionManager; -[@bs.val] external setTimeout: (unit => unit, int) => int = "setTimeout"; - -let queue_name = "amqp-connection-manager-sample1"; - -// Create a connetion manager -let connection = Amqp.connect([|"amqp://localhost"|], ()); - -Amqp.AmqpConnectionManager.on( - connection, - `disconnect(err => Js.Console.error(err)), -) -|> ignore; - -Amqp.AmqpConnectionManager.on( - connection, - `connect(_ => Js.Console.info("connected!")), -) -|> ignore; - -// Set up a channel listening for messages in the queue. -let channelWrapper = - Amqp.AmqpConnectionManager.createChannel( - connection, - { - "json": true, - "setup": channel => - // `channel` here is a regular amqplib `ConfirmChannel`. - Js.Promise.( - all([| - Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) - |> then_(_ => resolve()), - |]) - |> then_(_ => resolve()) - ), - }, - ); - -// Send messages until someone hits CTRL-C or something goes wrong... -let rec sendMessage = () => { - Amqp.ChannelWrapper.sendToQueue( - channelWrapper, - queue_name, - {"time": Js.Date.now()}, - Js.Obj.empty(), - ) - |> Js.Promise.then_(msg => { - Js.Console.info("Message sent"); - Js.Promise.make((~resolve, ~reject as _) => - setTimeout(() => resolve(. msg), 1000) |> ignore - ); - }) - |> Js.Promise.then_(_ => sendMessage()) - |> Js.Promise.catch(err => { - Js.Console.error(err); - Amqp.ChannelWrapper.close(channelWrapper); - Amqp.AmqpConnectionManager.close(connection); - - Js.Promise.resolve(); - }); -}; - -Js.Console.info("Sending messages..."); -sendMessage(); diff --git a/examples/sender.res b/examples/sender.res new file mode 100644 index 0000000..1acb2a5 --- /dev/null +++ b/examples/sender.res @@ -0,0 +1,55 @@ +/* this is pretty much the same as the examples provided in + * node-amqp-connection-manager + * https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/sender.js + * */ + +module Amqp = AmqpConnectionManager +@val external setTimeout: (unit => unit, int) => int = "setTimeout" + +let queue_name = "amqp-connection-manager-sample1" + +// Create a connetion manager +let connection = Amqp.connect(["amqp://localhost"], ()) + +Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err))) |> ignore + +Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("connected!"))) |> ignore + +// Set up a channel listening for messages in the queue. +let channelWrapper = Amqp.AmqpConnectionManager.createChannel( + connection, + { + "json": true, + "setup": channel => { + open // `channel` here is a regular amqplib `ConfirmChannel`. + Js.Promise + all([ + Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) |> then_(_ => resolve()), + ]) |> then_(_ => resolve()) + }, + }, +) + +// Send messages until someone hits CTRL-C or something goes wrong... +let rec sendMessage = () => + Amqp.ChannelWrapper.sendToQueue( + channelWrapper, + queue_name, + {"time": Js.Date.now()}, + Js.Obj.empty(), + ) + |> Js.Promise.then_(msg => { + Js.Console.info("Message sent") + Js.Promise.make((~resolve, ~reject as _) => setTimeout(() => resolve(. msg), 1000) |> ignore) + }) + |> Js.Promise.then_(_ => sendMessage()) + |> Js.Promise.catch(err => { + Js.Console.error(err) + Amqp.ChannelWrapper.close(channelWrapper) + Amqp.AmqpConnectionManager.close(connection) + + Js.Promise.resolve() + }) + +Js.Console.info("Sending messages...") +sendMessage()->ignore diff --git a/package-lock.json b/package-lock.json index fb72e0d..0c6478e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -428,12 +428,6 @@ "fill-range": "^7.0.1" } }, - "bs-platform": { - "version": "7.1.1", - "resolved": "https://registry.npmjs.org/bs-platform/-/bs-platform-7.1.1.tgz", - "integrity": "sha512-ckZHR3J+yxyEKXOBHX8+hfzWG2XX5BxhQ4Iw9lulHFGYdAm9Ep9LgKkIah7G6RYADLmVfTxFE48igvY3kkkl+g==", - "dev": true - }, "callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", @@ -5520,6 +5514,12 @@ "integrity": "sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==", "dev": true }, + "rescript": { + "version": "9.1.4", + "resolved": "https://registry.npmjs.org/rescript/-/rescript-9.1.4.tgz", + "integrity": "sha512-aXANK4IqecJzdnDpJUsU6pxMViCR5ogAxzuqS0mOr8TloMnzAjJFu63fjD6LCkWrKAhlMkFFzQvVQYaAaVkFXw==", + "dev": true + }, "resolve": { "version": "1.19.0", "resolved": "https://registry.npmjs.org/resolve/-/resolve-1.19.0.tgz", diff --git a/package.json b/package.json index e8f7576..04d6615 100644 --- a/package.json +++ b/package.json @@ -4,11 +4,12 @@ "scripts": { "semantic-release": "semantic-release", "test": "echo \"no test specified\" && exit 0", + "format": "rescript format -all", "prebuild": "npm run clean", "prestart": "npm run clean", - "build": "bsb -make-world", - "start": "bsb -make-world -w", - "clean": "bsb -clean-world" + "build": "rescript build -with-deps", + "start": "rescript build -with-deps -w", + "clean": "rescript clean -with-deps" }, "keywords": [ "BuckleScript" @@ -16,7 +17,7 @@ "author": "", "license": "MIT", "devDependencies": { - "bs-platform": "^7.1.1", + "rescript": "^9.1.4", "semantic-release": "^17.2.3" }, "peerDependencies": { diff --git a/src/AmqpConnectionManager.re b/src/AmqpConnectionManager.re deleted file mode 100644 index 69bf7b6..0000000 --- a/src/AmqpConnectionManager.re +++ /dev/null @@ -1,137 +0,0 @@ -type url = string; -type urls = array(url); - -exception ConnectionError(Js.Exn.t); - -module Queue = { - type name = string; - type message = {content: Node.Buffer.t}; -}; - -module Exchange = { - type name = string; -}; - -module Channel = { - type t; - type name = string; - type ack = Queue.message => unit; - type nack = Queue.message => unit; - - [@bs.send] external ack: (t, Queue.message) => unit = "ack"; - [@bs.send] external nack: (t, Queue.message) => unit = "nack"; - - module Config = { - type nonrec t('a) = {.. "setup": t => Js.Promise.t(unit)} as 'a; - }; - - [@bs.send] - external assertExchange: - (t, Exchange.name, string, Js.t('options)) => Js.Promise.t(Exchange.name) = - "assertExchange"; - - [@bs.send] - external assertQueue: - (t, Queue.name, Js.t('options)) => - Js.Promise.t({ - . - "queue": string, - "messageCount": int, - "consumerCount": int, - }) = - "assertQueue"; - - [@bs.send] - external bindQueue: - (t, Queue.name, Exchange.name, string) => Js.Promise.t(unit) = - "bindQueue"; - - [@bs.send] external prefetch: (t, int) => Js.Promise.t(unit) = "prefetch"; - - [@bs.send] - external consume: (t, Queue.name, 'a => unit) => Js.Promise.t(unit) = - "consume"; -}; - -module ChannelWrapper = { - type t; - type name = string; - type routingKey = string; - type ack = Queue.message => unit; - type nack = Queue.message => unit; - - [@bs.send] external ack: (t, Queue.message) => unit = "ack"; - [@bs.send] external nack: (t, Queue.message) => unit = "nack"; - [@bs.send] external queueLength: t => int = "queueLength"; - [@bs.send] external close: t => unit = "close"; - [@bs.send] - external waitForConnect: t => Js.Promise.t(unit) = "waitForConnect"; - - [@bs.send] - external on: - ( - t, - [@bs.string] [ - | `connect(unit => unit) - | `error((Js.Exn.t, Channel.name) => unit) - | `close(unit => unit) - ] - ) => - t = - "on"; - - [@bs.send] - external publish': - (t, Exchange.name, routingKey, 'message, Js.t('options)) => - Js.Promise.t(unit) = - "publish"; - - let publish = (t, e, k, m, o) => - publish'(t, e, k, m, o) |> Js.Promise.(then_(_ => resolve(m))); - - [@bs.send] - external sendToQueue': - (t, Queue.name, 'message, Js.t('options)) => Js.Promise.t(unit) = - "sendToQueue"; - - let sendToQueue = (t, q, m, o) => - sendToQueue'(t, q, m, o) |> Js.Promise.(then_(_ => resolve(m))); -}; - -module AmqpConnectionManager = { - type t; - module Options = { - type t('connectionOptions) = Js.t('connectionOptions); - }; - - [@bs.send] external isConnected: t => bool = "isConnected"; - [@bs.send] external close: t => unit = "close"; - [@bs.send] - external on: - ( - t, - [@bs.string] [ - | `connect( - { - . - "connection": t, - "url": url, - } => - unit, - ) - | `disconnect({. "err": Js.Exn.t} => unit) - ] - ) => - t = - "on"; - - [@bs.send] - external createChannel: (t, Channel.Config.t('a)) => ChannelWrapper.t = - "createChannel"; -}; - -[@bs.module "amqp-connection-manager"] -external connect: - (urls, ~options: AmqpConnectionManager.Options.t('a)=?, unit) => - AmqpConnectionManager.t = - "connect"; diff --git a/src/AmqpConnectionManager.rei b/src/AmqpConnectionManager.rei deleted file mode 100644 index fb21d8c..0000000 --- a/src/AmqpConnectionManager.rei +++ /dev/null @@ -1,112 +0,0 @@ -type url = string; -type urls = array(url); - -exception ConnectionError(Js.Exn.t); - -module Queue: { - type name = string; - type message = {content: Node.Buffer.t}; -}; - -module Exchange: {type name = string;}; - -module Channel: { - type t; - type name = string; - type ack = Queue.message => unit; - type nack = Queue.message => unit; - - let ack: t => ack; - let nack: t => nack; - - module Config: { - type nonrec t('a) = {.. "setup": t => Js.Promise.t(unit)} as 'a; - }; - - let assertExchange: - (t, Exchange.name, string, Js.t('options)) => Js.Promise.t(Exchange.name); - let assertQueue: - (t, Queue.name, Js.t('options)) => - Js.Promise.t({ - . - "queue": string, - "messageCount": int, - "consumerCount": int, - }); - let bindQueue: (t, Queue.name, Exchange.name, string) => Js.Promise.t(unit); - let prefetch: (t, int) => Js.Promise.t(unit); - let consume: (t, Queue.name, Queue.message => unit) => Js.Promise.t(unit); -}; - -module ChannelWrapper: { - type t; - type name = string; - type routingKey = string; - type ack = Queue.message => unit; - type nack = Queue.message => unit; - - let ack: t => ack; - let nack: t => nack; - let queueLength: t => int; - let close: t => unit; - let waitForConnect: t => Js.Promise.t(unit); - - let on: - ( - t, - [ - | `connect(unit => unit) - | `error((Js.Exn.t, Channel.name) => unit) - | `close(unit => unit) - ] - ) => - t; - - let publish: - (t, Exchange.name, routingKey, 'message, Js.t('options)) => - Js.Promise.t('message); - - let sendToQueue: - (t, Queue.name, 'message, Js.t('options)) => Js.Promise.t('message); -}; - -module AmqpConnectionManager: { - type t; - module Options: {type t('connectionOptions) = Js.t('connectionOptions);}; - - /** Returns true if the AmqpConnectionManager is connected to a broker, false - * otherwise. */ - let isConnected: t => bool; - - /** Close this AmqpConnectionManager and free all associated resources. */ - let close: t => unit; - - let on: - ( - t, - [ - | `connect( - { - . - "connection": t, - "url": url, - } => - unit, - ) - | `disconnect({. "err": Js.Exn.t} => unit) - ] - ) => - t; - - /** Create a new ChannelWrapper. This is a proxy for the actual channel (which - * may or may not exist at any moment, depending on whether or not we are - * currently connected.) */ - let createChannel: (t, Channel.Config.t('a)) => ChannelWrapper.t; -}; - -/** Creates a new AmqpConnectionManager, which will connect to one of the URLs - * provided in `urls`. If a broker is unreachable or dies, then - * AmqpConnectionManager will try the next available broker, round-robin. */ -let connect: - (urls, ~options: AmqpConnectionManager.Options.t('a)=?, unit) => - AmqpConnectionManager.t; diff --git a/src/AmqpConnectionManager.res b/src/AmqpConnectionManager.res new file mode 100644 index 0000000..d2e8809 --- /dev/null +++ b/src/AmqpConnectionManager.res @@ -0,0 +1,124 @@ +type url = string +type urls = array + +exception ConnectionError(Js.Exn.t) + +module Queue = { + type name = string + type message = {content: Node.Buffer.t} +} + +module Exchange = { + type name = string +} + +module Channel = { + type t + type name = string + type ack = Queue.message => unit + type nack = Queue.message => unit + + @send external ack: (t, Queue.message) => unit = "ack" + @send external nack: (t, Queue.message) => unit = "nack" + + module Config = { + type t<'a> = {.."setup": t => Js.Promise.t} as 'a + } + + @send + external assertExchange: (t, Exchange.name, string, 'options) => Js.Promise.t = + "assertExchange" + + @send + external assertQueue: ( + t, + Queue.name, + 'options, + ) => Js.Promise.t<{ + "queue": string, + "messageCount": int, + "consumerCount": int, + }> = "assertQueue" + + @send + external bindQueue: (t, Queue.name, Exchange.name, string) => Js.Promise.t = "bindQueue" + + @send external prefetch: (t, int) => Js.Promise.t = "prefetch" + + @send + external consume: (t, Queue.name, 'a => unit) => Js.Promise.t = "consume" +} + +module ChannelWrapper = { + type t + type name = string + type routingKey = string + type ack = Queue.message => unit + type nack = Queue.message => unit + + @send external ack: (t, Queue.message) => unit = "ack" + @send external nack: (t, Queue.message) => unit = "nack" + @send external queueLength: t => int = "queueLength" + @send external close: t => unit = "close" + @send + external waitForConnect: t => Js.Promise.t = "waitForConnect" + + @send + external on: ( + t, + @string + [ + | #connect(unit => unit) + | #error((Js.Exn.t, Channel.name) => unit) + | #close(unit => unit) + ], + ) => t = "on" + + @send + external publish': (t, Exchange.name, routingKey, 'message, 'options) => Js.Promise.t = + "publish" + + let publish = (t, e, k, m, o) => + publish'(t, e, k, m, o) |> { + open Js.Promise + then_(_ => resolve(m)) + } + + @send + external sendToQueue': (t, Queue.name, 'message, 'options) => Js.Promise.t = "sendToQueue" + + let sendToQueue = (t, q, m, o) => + sendToQueue'(t, q, m, o) |> { + open Js.Promise + then_(_ => resolve(m)) + } +} + +module AmqpConnectionManager = { + type t + module Options = { + type t<'connectionOptions> = 'connectionOptions + } + + @send external isConnected: t => bool = "isConnected" + @send external close: t => unit = "close" + @send + external on: ( + t, + @string + [ + | #connect({"connection": t, "url": url} => unit) + | #disconnect({"err": Js.Exn.t} => unit) + ], + ) => t = "on" + + @send + external createChannel: (t, Channel.Config.t<'a>) => ChannelWrapper.t = "createChannel" +} + +@module("amqp-connection-manager") +external connect: ( + urls, + ~options: AmqpConnectionManager.Options.t<'a>=?, + unit, +) => AmqpConnectionManager.t = "connect" diff --git a/src/AmqpConnectionManager.resi b/src/AmqpConnectionManager.resi new file mode 100644 index 0000000..d6220c3 --- /dev/null +++ b/src/AmqpConnectionManager.resi @@ -0,0 +1,104 @@ +type url = string +type urls = array + +exception ConnectionError(Js.Exn.t) + +module Queue: { + type name = string + type message = {content: Node.Buffer.t} +} + +module Exchange: { + type name = string +} + +module Channel: { + type t + type name = string + type ack = Queue.message => unit + type nack = Queue.message => unit + + let ack: t => ack + let nack: t => nack + + module Config: { + type t<'a> = {.."setup": t => Js.Promise.t} as 'a + } + + let assertExchange: (t, Exchange.name, string, 'options) => Js.Promise.t + let assertQueue: ( + t, + Queue.name, + 'options, + ) => Js.Promise.t<{ + "queue": string, + "messageCount": int, + "consumerCount": int, + }> + let bindQueue: (t, Queue.name, Exchange.name, string) => Js.Promise.t + let prefetch: (t, int) => Js.Promise.t + let consume: (t, Queue.name, Queue.message => unit) => Js.Promise.t +} + +module ChannelWrapper: { + type t + type name = string + type routingKey = string + type ack = Queue.message => unit + type nack = Queue.message => unit + + let ack: t => ack + let nack: t => nack + let queueLength: t => int + let close: t => unit + let waitForConnect: t => Js.Promise.t + + let on: ( + t, + [ + | #connect(unit => unit) + | #error((Js.Exn.t, Channel.name) => unit) + | #close(unit => unit) + ], + ) => t + + let publish: (t, Exchange.name, routingKey, 'message, 'options) => Js.Promise.t<'message> + + let sendToQueue: (t, Queue.name, 'message, 'options) => Js.Promise.t<'message> +} + +module AmqpConnectionManager: { + type t + module Options: { + type t<'connectionOptions> = 'connectionOptions + } + + @ocaml.doc(" Returns true if the AmqpConnectionManager is connected to a broker, false + * otherwise. ") + let isConnected: t => bool + + @ocaml.doc(" Close this AmqpConnectionManager and free all associated resources. ") + let close: t => unit + + let on: ( + t, + [ + | #connect({"connection": t, "url": url} => unit) + | #disconnect({"err": Js.Exn.t} => unit) + ], + ) => t + + @ocaml.doc(" Create a new ChannelWrapper. This is a proxy for the actual channel (which + * may or may not exist at any moment, depending on whether or not we are + * currently connected.) ") + let createChannel: (t, Channel.Config.t<'a>) => ChannelWrapper.t +} + +@ocaml.doc(" Creates a new AmqpConnectionManager, which will connect to one of the URLs + * provided in `urls`. If a broker is unreachable or dies, then + * AmqpConnectionManager will try the next available broker, round-robin. ") +let connect: ( + urls, + ~options: AmqpConnectionManager.Options.t<'a>=?, + unit, +) => AmqpConnectionManager.t