-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
436 additions
and
504 deletions.
There are no files selected for viewing
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* this is pretty much the same as the examples provided in | ||
* node-amqp-connection-manager | ||
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-publisher.js */ | ||
|
||
module Amqp = AmqpConnectionManager | ||
@val external setTimeout: (unit => unit, int) => int = "setTimeout" | ||
|
||
let exchange_name = "amqp-connection-manager-sample2-ex" | ||
|
||
// Create a connetion manager | ||
let connection = Amqp.connect(["amqp://localhost"], ()) | ||
Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("Connected!")))->ignore | ||
Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err)))->ignore | ||
|
||
// Create a channel wrapper | ||
let channelWrapper = Amqp.AmqpConnectionManager.createChannel( | ||
connection, | ||
{ | ||
"json": true, | ||
"setup": channel => | ||
Amqp.Channel.assertExchange( | ||
channel, | ||
exchange_name, | ||
"topic", | ||
Js.Obj.empty(), | ||
) |> Js.Promise.then_(_ => Js.Promise.resolve()), | ||
}, | ||
) | ||
|
||
// Send messages until someone hits CTRL-C or something goes wrong... | ||
let rec sendMessage = () => | ||
Amqp.ChannelWrapper.publish( | ||
channelWrapper, | ||
exchange_name, | ||
"", | ||
{"time": Js.Date.now()}, | ||
{"contentType": "application/json", "persistent": true}, | ||
) | ||
|> Js.Promise.then_(msg => { | ||
Js.Console.info("Message sent") | ||
Js.Promise.make((~resolve, ~reject as _) => setTimeout(() => resolve(. msg), 1000) |> ignore) | ||
}) | ||
|> Js.Promise.then_(_ => sendMessage()) | ||
|> Js.Promise.catch(err => { | ||
Js.Console.error(err) | ||
Amqp.ChannelWrapper.close(channelWrapper) | ||
Amqp.AmqpConnectionManager.close(connection) | ||
|
||
Js.Promise.resolve() | ||
}) | ||
|
||
Js.Console.info("Sending messages...") | ||
sendMessage()->ignore |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* this is pretty much the same as the examples provided in | ||
* node-amqp-connection-manager | ||
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/pubsub-subscriber.js */ | ||
|
||
module Amqp = AmqpConnectionManager | ||
|
||
let queue_name = "amqp-connection-manager-sample2" | ||
let exchange_name = "amqp-connection-manager-sample2-ex" | ||
|
||
// Handle an incomming message. | ||
let onMessage = (channel, msg: Amqp.Queue.message) => { | ||
let message = msg.content->Node.Buffer.toString->Js.Json.parseExn | ||
Js.Console.log2("receiver: got message", message) | ||
Amqp.Channel.ack(channel, msg) | ||
} | ||
|
||
// Create a connetion manager | ||
let connection = Amqp.connect(["amqp://localhost"], ()) | ||
Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("Connected!")))->ignore | ||
Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err)))->ignore | ||
|
||
// Set up a channel listening for messages in the queue. | ||
let channelWrapper = Amqp.AmqpConnectionManager.createChannel( | ||
connection, | ||
{ | ||
"setup": channel => { | ||
open // `channel` here is a regular amqplib `ConfirmChannel`. | ||
Js.Promise | ||
all([ | ||
Amqp.Channel.assertQueue( | ||
channel, | ||
queue_name, | ||
{"exclusive": true, "autoDelete": true, "durable": false}, | ||
) |> then_(_ => resolve()), | ||
Amqp.Channel.assertExchange(channel, exchange_name, "topic", Js.Obj.empty()) |> then_(_ => | ||
resolve() | ||
), | ||
Amqp.Channel.prefetch(channel, 1), | ||
Amqp.Channel.bindQueue(channel, queue_name, exchange_name, ""), | ||
Amqp.Channel.consume(channel, queue_name, onMessage(channel)), | ||
]) |> then_(_ => resolve()) | ||
}, | ||
}, | ||
) | ||
|
||
Amqp.ChannelWrapper.waitForConnect(channelWrapper) | ||
|> Js.Promise.then_(_ => { | ||
Js.Console.info("Listening for messages") | ||
Js.Promise.resolve() | ||
}) | ||
|> ignore |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* this is pretty much the same as the examples provided in | ||
* node-amqp-connection-manager | ||
* https://github.com/benbria/node-amqp-connection-manager/blob/master/examples/receiver.js | ||
* */ | ||
|
||
module Amqp = AmqpConnectionManager | ||
|
||
let queue_name = "amqp-connection-manager-sample1" | ||
|
||
// Handle an incomming message. | ||
let onMessage = (channel, msg: Amqp.Queue.message) => { | ||
let message = msg.content->Node.Buffer.toString->Js.Json.parseExn | ||
Js.Console.log2("receiver: got message", message) | ||
Amqp.Channel.ack(channel, msg) | ||
} | ||
|
||
// Create a connetion manager | ||
let connection = Amqp.connect(["amqp://localhost"], ()) | ||
|
||
Amqp.AmqpConnectionManager.on(connection, #disconnect(err => Js.Console.error(err))) |> ignore | ||
|
||
Amqp.AmqpConnectionManager.on(connection, #connect(_ => Js.Console.info("connected!"))) |> ignore | ||
|
||
// Set up a channel listening for messages in the queue. | ||
let channelWrapper = Amqp.AmqpConnectionManager.createChannel( | ||
connection, | ||
{ | ||
"setup": channel => { | ||
open // `channel` here is a regular amqplib `ConfirmChannel`. | ||
Js.Promise | ||
all([ | ||
Amqp.Channel.assertQueue(channel, queue_name, {"durable": true}) |> then_(_ => resolve()), | ||
Amqp.Channel.prefetch(channel, 1), | ||
Amqp.Channel.consume(channel, queue_name, onMessage(channel)), | ||
]) |> then_(_ => resolve()) | ||
}, | ||
}, | ||
) |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.