From a06ef44efe30434e0cb0fb407f8769fcd09c0a8e Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Thu, 22 Feb 2024 08:26:47 +0000 Subject: [PATCH 1/3] Add connection-update-secret --- Makefile | 5 +- lib/callback_model.js | 4 + lib/channel_model.js | 4 + lib/connection.js | 11 + test/callback_api.js | 356 +++++++++-------- test/channel_api.js | 900 +++++++++++++++++++++--------------------- 6 files changed, 658 insertions(+), 622 deletions(-) diff --git a/Makefile b/Makefile index db94e70a..a3d87428 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ -RABBITMQ_SRC_VERSION=rabbitmq_v3_2_1 +RABBITMQ_SRC_VERSION=v3.12.13 JSON=amqp-rabbitmq-0.9.1.json -RABBITMQ_CODEGEN=https://raw.githubusercontent.com/rabbitmq/rabbitmq-codegen -AMQP_JSON=$(RABBITMQ_CODEGEN)/$(RABBITMQ_SRC_VERSION)/$(JSON) +AMQP_JSON=https://raw.githubusercontent.com/rabbitmq/rabbitmq-server/$(RABBITMQ_SRC_VERSION)/deps/rabbitmq_codegen/$(JSON) NODEJS_VERSIONS='10.21' '11.15' '12.18' '13.14' '14.5' '15.8' '16.3.0' '18.1.0' '20.10.0' diff --git a/lib/callback_model.js b/lib/callback_model.js index aad24150..45b5de0f 100644 --- a/lib/callback_model.js +++ b/lib/callback_model.js @@ -24,6 +24,10 @@ class CallbackModel extends EventEmitter { this.connection.close(cb); } + updateSecret(newSecret, reason, cb) { + this.connection._updateSecret(newSecret, reason, cb); + } + createChannel (cb) { var ch = new Channel(this.connection); ch.open(function (err, ok) { diff --git a/lib/channel_model.js b/lib/channel_model.js index be38d00c..f95b0192 100644 --- a/lib/channel_model.js +++ b/lib/channel_model.js @@ -26,6 +26,10 @@ class ChannelModel extends EventEmitter { return promisify(this.connection.close.bind(this.connection))(); } + updateSecret(newSecret, reason) { + return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason); + } + async createChannel() { const channel = new Channel(this.connection); await channel.open(); diff --git a/lib/connection.js b/lib/connection.js index f3cae69c..3586fdfd 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -361,6 +361,14 @@ class Connection extends EventEmitter { this.emit('close', maybeErr); } + _updateSecret(newSecret, reason, cb) { + this.sendMethod(0, defs.ConnectionUpdateSecret, { + newSecret, + reason + }); + this.on('update-secret-ok', cb); + } + // === startHeartbeater () { if (this.heartbeat === 0) @@ -611,6 +619,9 @@ function channel0(connection) { else if (f.id === defs.ConnectionUnblocked) { connection.emit('unblocked'); } + else if (f.id === defs.ConnectionUpdateSecretOk) { + connection.emit('update-secret-ok'); + } else { connection.closeWithError( fmt("Unexpected frame on channel 0"), diff --git a/test/callback_api.js b/test/callback_api.js index 21f3a900..29a9f409 100644 --- a/test/callback_api.js +++ b/test/callback_api.js @@ -57,10 +57,18 @@ function waitForMessages(ch, q, k) { suite('connect', function() { -test('at all', function(done) { - connect(doneCallback(done)); + test('at all', function(done) { + connect(doneCallback(done)); + }); + }); +suite('updateSecret', function() { + test('updateSecret', function(done) { + connect(kCallback(function(c) { + c.updateSecret(Buffer.from('new secret'), 'no reason', doneCallback(done)); + })); + }); }); function channel_test_fn(method) { @@ -79,234 +87,234 @@ var confirm_channel_test = channel_test_fn('createConfirmChannel'); suite('channel open', function() { -channel_test('at all', function(ch, done) { - done(); -}); + channel_test('at all', function(ch, done) { + done(); + }); -channel_test('open and close', function(ch, done) { - ch.close(doneCallback(done)); -}); + channel_test('open and close', function(ch, done) { + ch.close(doneCallback(done)); + }); }); suite('assert, check, delete', function() { -channel_test('assert, check, delete queue', function(ch, done) { - ch.assertQueue('test.cb.queue', {}, kCallback(function(q) { - ch.checkQueue('test.cb.queue', kCallback(function(ok) { - ch.deleteQueue('test.cb.queue', {}, doneCallback(done)); + channel_test('assert, check, delete queue', function(ch, done) { + ch.assertQueue('test.cb.queue', {}, kCallback(function(q) { + ch.checkQueue('test.cb.queue', kCallback(function(ok) { + ch.deleteQueue('test.cb.queue', {}, doneCallback(done)); + }, done)); }, done)); - }, done)); -}); + }); -channel_test('assert, check, delete exchange', function(ch, done) { - ch.assertExchange( - 'test.cb.exchange', 'topic', {}, kCallback(function(ex) { - ch.checkExchange('test.cb.exchange', kCallback(function(ok) { - ch.deleteExchange('test.cb.exchange', {}, doneCallback(done)); + channel_test('assert, check, delete exchange', function(ch, done) { + ch.assertExchange( + 'test.cb.exchange', 'topic', {}, kCallback(function(ex) { + ch.checkExchange('test.cb.exchange', kCallback(function(ok) { + ch.deleteExchange('test.cb.exchange', {}, doneCallback(done)); + }, done)); }, done)); - }, done)); -}); + }); -channel_test('fail on check non-queue', function(ch, done) { - var both = twice(done); - ch.on('error', failCallback(both.first)); - ch.checkQueue('test.cb.nothere', failCallback(both.second)); -}); + channel_test('fail on check non-queue', function(ch, done) { + var both = twice(done); + ch.on('error', failCallback(both.first)); + ch.checkQueue('test.cb.nothere', failCallback(both.second)); + }); -channel_test('fail on check non-exchange', function(ch, done) { - var both = twice(done); - ch.on('error', failCallback(both.first)); - ch.checkExchange('test.cb.nothere', failCallback(both.second)); -}); + channel_test('fail on check non-exchange', function(ch, done) { + var both = twice(done); + ch.on('error', failCallback(both.first)); + ch.checkExchange('test.cb.nothere', failCallback(both.second)); + }); }); suite('bindings', function() { -channel_test('bind queue', function(ch, done) { - ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) { - ch.assertExchange( - 'test.cb.bindex', 'fanout', {}, kCallback(function(ex) { - ch.bindQueue(q.queue, ex.exchange, '', {}, - doneCallback(done)); - }, done)); - }, done)); -}); - -channel_test('bind exchange', function(ch, done) { - ch.assertExchange( - 'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) { + channel_test('bind queue', function(ch, done) { + ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) { ch.assertExchange( - 'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) { - ch.bindExchange(ex1.exchange, - ex2.exchange, '', {}, - doneCallback(done)); + 'test.cb.bindex', 'fanout', {}, kCallback(function(ex) { + ch.bindQueue(q.queue, ex.exchange, '', {}, + doneCallback(done)); }, done)); }, done)); -}); + }); + + channel_test('bind exchange', function(ch, done) { + ch.assertExchange( + 'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) { + ch.assertExchange( + 'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) { + ch.bindExchange(ex1.exchange, + ex2.exchange, '', {}, + doneCallback(done)); + }, done)); + }, done)); + }); }); suite('sending messages', function() { -channel_test('send to queue and consume noAck', function(ch, done) { - var msg = randomString(); - ch.assertQueue('', {exclusive: true}, function(e, q) { - if (e !== null) return done(e); - ch.consume(q.queue, function(m) { - if (m.content.toString() == msg) done(); - else done(new Error("message content doesn't match:" + - msg + " =/= " + m.content.toString())); - }, {noAck: true, exclusive: true}); - ch.sendToQueue(q.queue, Buffer.from(msg)); + channel_test('send to queue and consume noAck', function(ch, done) { + var msg = randomString(); + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + ch.consume(q.queue, function(m) { + if (m.content.toString() == msg) done(); + else done(new Error("message content doesn't match:" + + msg + " =/= " + m.content.toString())); + }, {noAck: true, exclusive: true}); + ch.sendToQueue(q.queue, Buffer.from(msg)); + }); }); -}); -channel_test('send to queue and consume ack', function(ch, done) { - var msg = randomString(); - ch.assertQueue('', {exclusive: true}, function(e, q) { - if (e !== null) return done(e); - ch.consume(q.queue, function(m) { - if (m.content.toString() == msg) { - ch.ack(m); - done(); - } - else done(new Error("message content doesn't match:" + - msg + " =/= " + m.content.toString())); - }, {noAck: false, exclusive: true}); - ch.sendToQueue(q.queue, Buffer.from(msg)); + channel_test('send to queue and consume ack', function(ch, done) { + var msg = randomString(); + ch.assertQueue('', {exclusive: true}, function(e, q) { + if (e !== null) return done(e); + ch.consume(q.queue, function(m) { + if (m.content.toString() == msg) { + ch.ack(m); + done(); + } + else done(new Error("message content doesn't match:" + + msg + " =/= " + m.content.toString())); + }, {noAck: false, exclusive: true}); + ch.sendToQueue(q.queue, Buffer.from(msg)); + }); }); -}); -channel_test('send to and get from queue', function(ch, done) { - ch.assertQueue('', {exclusive: true}, function(e, q) { - if (e != null) return done(e); - var msg = randomString(); - ch.sendToQueue(q.queue, Buffer.from(msg)); - waitForMessages(ch, q.queue, function(e, _) { + channel_test('send to and get from queue', function(ch, done) { + ch.assertQueue('', {exclusive: true}, function(e, q) { if (e != null) return done(e); - ch.get(q.queue, {noAck: true}, function(e, m) { - if (e != null) - return done(e); - else if (!m) - return done(new Error('Empty (false) not expected')); - else if (m.content.toString() == msg) - return done(); - else - return done( - new Error('Messages do not match: ' + - msg + ' =/= ' + m.content.toString())); + var msg = randomString(); + ch.sendToQueue(q.queue, Buffer.from(msg)); + waitForMessages(ch, q.queue, function(e, _) { + if (e != null) return done(e); + ch.get(q.queue, {noAck: true}, function(e, m) { + if (e != null) + return done(e); + else if (!m) + return done(new Error('Empty (false) not expected')); + else if (m.content.toString() == msg) + return done(); + else + return done( + new Error('Messages do not match: ' + + msg + ' =/= ' + m.content.toString())); + }); }); }); }); -}); }); suite('ConfirmChannel', function() { -confirm_channel_test('Receive confirmation', function(ch, done) { - // An unroutable message, on the basis that you're not allowed a - // queue with an empty name, and you can't make bindings to the - // default exchange. Tricky eh? - ch.publish('', '', Buffer.from('foo'), {}, done); -}); + confirm_channel_test('Receive confirmation', function(ch, done) { + // An unroutable message, on the basis that you're not allowed a + // queue with an empty name, and you can't make bindings to the + // default exchange. Tricky eh? + ch.publish('', '', Buffer.from('foo'), {}, done); + }); -confirm_channel_test('Wait for confirms', function(ch, done) { - for (var i=0; i < 1000; i++) { - ch.publish('', '', Buffer.from('foo'), {}); - } - ch.waitForConfirms(done); -}); + confirm_channel_test('Wait for confirms', function(ch, done) { + for (var i=0; i < 1000; i++) { + ch.publish('', '', Buffer.from('foo'), {}); + } + ch.waitForConfirms(done); + }); }); suite("Error handling", function() { -/* -I don't like having to do this, but there appears to be something -broken about domains in Node.JS v0.8 and mocha. Apparently it has to -do with how mocha and domains hook into error propogation: -https://github.com/visionmedia/mocha/issues/513 (summary: domains in -Node.JS v0.8 don't prevent uncaughtException from firing, and that's -what mocha uses to detect .. an uncaught exception). - -Using domains with amqplib *does* work in practice in Node.JS v0.8: -that is, it's possible to throw an exception in a callback and deal -with it in the active domain, and thereby avoid it crashing the -program. - */ -if (util.versionGreaterThan(process.versions.node, '0.8')) { - test('Throw error in connection open callback', function(done) { - var dom = domain.createDomain(); - dom.on('error', failCallback(done)); - connect(dom.bind(function(err, conn) { - throw new Error('Spurious connection open callback error'); - })); - }); -} + /* + I don't like having to do this, but there appears to be something + broken about domains in Node.JS v0.8 and mocha. Apparently it has to + do with how mocha and domains hook into error propogation: + https://github.com/visionmedia/mocha/issues/513 (summary: domains in + Node.JS v0.8 don't prevent uncaughtException from firing, and that's + what mocha uses to detect .. an uncaught exception). + + Using domains with amqplib *does* work in practice in Node.JS v0.8: + that is, it's possible to throw an exception in a callback and deal + with it in the active domain, and thereby avoid it crashing the + program. + */ + if (util.versionGreaterThan(process.versions.node, '0.8')) { + test('Throw error in connection open callback', function(done) { + var dom = domain.createDomain(); + dom.on('error', failCallback(done)); + connect(dom.bind(function(err, conn) { + throw new Error('Spurious connection open callback error'); + })); + }); + } -// TODO: refactor {error_test, channel_test} -function error_test(name, fun) { - test(name, function(done) { - var dom = domain.createDomain(); - dom.run(function() { - connect(kCallback(function(c) { - // Seems like there were some unironed wrinkles in 0.8's - // implementation of domains; explicitly adding the connection - // to the domain makes sure any exception thrown in the course - // of processing frames is handled by the domain. For other - // versions of Node.JS, this ends up being belt-and-braces. - dom.add(c); - c.createChannel(kCallback(function(ch) { - fun(ch, done, dom); + // TODO: refactor {error_test, channel_test} + function error_test(name, fun) { + test(name, function(done) { + var dom = domain.createDomain(); + dom.run(function() { + connect(kCallback(function(c) { + // Seems like there were some unironed wrinkles in 0.8's + // implementation of domains; explicitly adding the connection + // to the domain makes sure any exception thrown in the course + // of processing frames is handled by the domain. For other + // versions of Node.JS, this ends up being belt-and-braces. + dom.add(c); + c.createChannel(kCallback(function(ch) { + fun(ch, done, dom); + }, done)); }, done)); - }, done)); + }); }); - }); -} - -error_test('Channel open callback throws an error', function(ch, done, dom) { - dom.on('error', failCallback(done)); - throw new Error('Error in open callback'); -}); + } -error_test('RPC callback throws error', function(ch, done, dom) { - dom.on('error', failCallback(done)); - ch.prefetch(0, false, function(err, ok) { - throw new Error('Spurious callback error'); + error_test('Channel open callback throws an error', function(ch, done, dom) { + dom.on('error', failCallback(done)); + throw new Error('Error in open callback'); }); -}); -error_test('Get callback throws error', function(ch, done, dom) { - dom.on('error', failCallback(done)); - ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) { - ch.get('test.cb.get-with-error', {noAck: true}, function() { + error_test('RPC callback throws error', function(ch, done, dom) { + dom.on('error', failCallback(done)); + ch.prefetch(0, false, function(err, ok) { throw new Error('Spurious callback error'); }); }); -}); -error_test('Consume callback throws error', function(ch, done, dom) { - dom.on('error', failCallback(done)); - ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) { - ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() { - throw new Error('Spurious callback error'); + error_test('Get callback throws error', function(ch, done, dom) { + dom.on('error', failCallback(done)); + ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) { + ch.get('test.cb.get-with-error', {noAck: true}, function() { + throw new Error('Spurious callback error'); + }); }); }); -}); -error_test('Get from non-queue invokes error k', function(ch, done, dom) { - var both = twice(failCallback(done)); - dom.on('error', both.first); - ch.get('', {}, both.second); -}); + error_test('Consume callback throws error', function(ch, done, dom) { + dom.on('error', failCallback(done)); + ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) { + ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() { + throw new Error('Spurious callback error'); + }); + }); + }); -error_test('Consume from non-queue invokes error k', function(ch, done, dom) { - var both = twice(failCallback(done)); - dom.on('error', both.first); - ch.consume('', both.second); -}); + error_test('Get from non-queue invokes error k', function(ch, done, dom) { + var both = twice(failCallback(done)); + dom.on('error', both.first); + ch.get('', {}, both.second); + }); + + error_test('Consume from non-queue invokes error k', function(ch, done, dom) { + var both = twice(failCallback(done)); + dom.on('error', both.first); + ch.consume('', both.second); + }); }); diff --git a/test/channel_api.js b/test/channel_api.js index b4952e27..77ed496a 100644 --- a/test/channel_api.js +++ b/test/channel_api.js @@ -49,94 +49,104 @@ var chtest = channel_test.bind(null, 'createChannel'); suite("connect", function() { -test("at all", function(done) { - connect(URL).then(function(c) { - return c.close() - ;}).then(succeed(done), fail(done)); -}); + test("at all", function(done) { + connect(URL).then(function(c) { + return c.close() + ;}).then(succeed(done), fail(done)); + }); -chtest("create channel", ignore); // i.e., just don't bork + chtest("create channel", ignore); // i.e., just don't bork }); +suite('updateSecret', function() { + test('updateSecret', function(done) { + connect().then(function(c) { + c.updateSecret(Buffer.from('new secret'), 'no reason') + .then(succeed(done), fail(done)) + .finally(function() { c.close(); }); + }); + }); +}); + var QUEUE_OPTS = {durable: false}; var EX_OPTS = {durable: false}; suite("assert, check, delete", function() { -chtest("assert and check queue", function(ch) { - return ch.assertQueue('test.check-queue', QUEUE_OPTS) - .then(function(qok) { - return ch.checkQueue('test.check-queue'); - }); -}); + chtest("assert and check queue", function(ch) { + return ch.assertQueue('test.check-queue', QUEUE_OPTS) + .then(function(qok) { + return ch.checkQueue('test.check-queue'); + }); + }); -chtest("assert and check exchange", function(ch) { - return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS) - .then(function(eok) { - assert.equal('test.check-exchange', eok.exchange); - return ch.checkExchange('test.check-exchange'); - }); -}); + chtest("assert and check exchange", function(ch) { + return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS) + .then(function(eok) { + assert.equal('test.check-exchange', eok.exchange); + return ch.checkExchange('test.check-exchange'); + }); + }); -chtest("fail on reasserting queue with different options", - function(ch) { - var q = 'test.reassert-queue'; - return ch.assertQueue( - q, {durable: false, autoDelete: true}) - .then(function() { - return expectFail( - ch.assertQueue(q, {durable: false, - autoDelete: false})); - }); - }); - -chtest("fail on checking a queue that's not there", function(ch) { - return expectFail(ch.checkQueue('test.random-' + randomString())); -}); + chtest("fail on reasserting queue with different options", + function(ch) { + var q = 'test.reassert-queue'; + return ch.assertQueue( + q, {durable: false, autoDelete: true}) + .then(function() { + return expectFail( + ch.assertQueue(q, {durable: false, + autoDelete: false})); + }); + }); + + chtest("fail on checking a queue that's not there", function(ch) { + return expectFail(ch.checkQueue('test.random-' + randomString())); + }); -chtest("fail on checking an exchange that's not there", function(ch) { - return expectFail(ch.checkExchange('test.random-' + randomString())); -}); + chtest("fail on checking an exchange that's not there", function(ch) { + return expectFail(ch.checkExchange('test.random-' + randomString())); + }); -chtest("fail on reasserting exchange with different type", - function(ch) { - var ex = 'test.reassert-ex'; - return ch.assertExchange(ex, 'fanout', EX_OPTS) - .then(function() { - return expectFail( - ch.assertExchange(ex, 'direct', EX_OPTS)); - }); - }); - -chtest("channel break on publishing to non-exchange", function(ch) { - return new Promise(function(resolve) { - ch.on('error', resolve); - ch.publish(randomString(), '', Buffer.from('foobar')); + chtest("fail on reasserting exchange with different type", + function(ch) { + var ex = 'test.reassert-ex'; + return ch.assertExchange(ex, 'fanout', EX_OPTS) + .then(function() { + return expectFail( + ch.assertExchange(ex, 'direct', EX_OPTS)); + }); + }); + + chtest("channel break on publishing to non-exchange", function(ch) { + return new Promise(function(resolve) { + ch.on('error', resolve); + ch.publish(randomString(), '', Buffer.from('foobar')); + }); }); -}); -chtest("delete queue", function(ch) { - var q = 'test.delete-queue'; - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), - ch.checkQueue(q)]) - .then(function() { - return ch.deleteQueue(q);}) - .then(function() { - return expectFail(ch.checkQueue(q));}); -}); + chtest("delete queue", function(ch) { + var q = 'test.delete-queue'; + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), + ch.checkQueue(q)]) + .then(function() { + return ch.deleteQueue(q);}) + .then(function() { + return expectFail(ch.checkQueue(q));}); + }); -chtest("delete exchange", function(ch) { - var ex = 'test.delete-exchange'; - return Promise.all([ - ch.assertExchange(ex, 'fanout', EX_OPTS), - ch.checkExchange(ex)]) - .then(function() { - return ch.deleteExchange(ex);}) - .then(function() { - return expectFail(ch.checkExchange(ex));}); -}); + chtest("delete exchange", function(ch) { + var ex = 'test.delete-exchange'; + return Promise.all([ + ch.assertExchange(ex, 'fanout', EX_OPTS), + ch.checkExchange(ex)]) + .then(function() { + return ch.deleteExchange(ex);}) + .then(function() { + return expectFail(ch.checkExchange(ex));}); + }); }); @@ -173,347 +183,347 @@ function waitForMessages(q, num) { suite("sendMessage", function() { -// publish different size messages -chtest("send to queue and get from queue", function(ch) { - var q = 'test.send-to-q'; - var msg = randomString(); - return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) - .then(function() { - ch.sendToQueue(q, Buffer.from(msg)); - return waitForMessages(q); - }) - .then(function() { - return ch.get(q, {noAck: true}); - }) - .then(function(m) { - assert(m); - assert.equal(msg, m.content.toString()); - }); -}); + // publish different size messages + chtest("send to queue and get from queue", function(ch) { + var q = 'test.send-to-q'; + var msg = randomString(); + return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) + .then(function() { + ch.sendToQueue(q, Buffer.from(msg)); + return waitForMessages(q); + }) + .then(function() { + return ch.get(q, {noAck: true}); + }) + .then(function(m) { + assert(m); + assert.equal(msg, m.content.toString()); + }); + }); -chtest("send (and get) zero content to queue", function(ch) { - var q = 'test.send-to-q'; - var msg = Buffer.alloc(0); - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q)]) - .then(function() { - ch.sendToQueue(q, msg); - return waitForMessages(q);}) - .then(function() { - return ch.get(q, {noAck: true});}) - .then(function(m) { - assert(m); - assert.deepEqual(msg, m.content); - }); -}); + chtest("send (and get) zero content to queue", function(ch) { + var q = 'test.send-to-q'; + var msg = Buffer.alloc(0); + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q)]) + .then(function() { + ch.sendToQueue(q, msg); + return waitForMessages(q);}) + .then(function() { + return ch.get(q, {noAck: true});}) + .then(function(m) { + assert(m); + assert.deepEqual(msg, m.content); + }); + }); }); suite("binding, consuming", function() { -// bind, publish, get -chtest("route message", function(ch) { - var ex = 'test.route-message'; - var q = 'test.route-message-q'; - var msg = randomString(); - - return Promise.all([ - ch.assertExchange(ex, 'fanout', EX_OPTS), - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q), - ch.bindQueue(q, ex, '', {})]) - .then(function() { - ch.publish(ex, '', Buffer.from(msg)); - return waitForMessages(q);}) - .then(function() { - return ch.get(q, {noAck: true});}) - .then(function(m) { - assert(m); - assert.equal(msg, m.content.toString()); - }); -}); + // bind, publish, get + chtest("route message", function(ch) { + var ex = 'test.route-message'; + var q = 'test.route-message-q'; + var msg = randomString(); -// send to queue, purge, get-empty -chtest("purge queue", function(ch) { - var q = 'test.purge-queue'; - return ch.assertQueue(q, {durable: false}) - .then(function() { - ch.sendToQueue(q, Buffer.from('foobar')); - return waitForMessages(q);}) - .then(function() { - ch.purgeQueue(q); - return ch.get(q, {noAck: true});}) - .then(function(m) { - assert(!m); // get-empty - }); -}); - -// bind again, unbind, publish, get-empty -chtest("unbind queue", function(ch) { - var ex = 'test.unbind-queue-ex'; - var q = 'test.unbind-queue'; - var viabinding = randomString(); - var direct = randomString(); - - return Promise.all([ - ch.assertExchange(ex, 'fanout', EX_OPTS), - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q), - ch.bindQueue(q, ex, '', {})]) - .then(function() { - ch.publish(ex, '', Buffer.from('foobar')); - return waitForMessages(q);}) - .then(function() { // message got through! - return ch.get(q, {noAck:true}) - .then(function(m) {assert(m);});}) - .then(function() { - return ch.unbindQueue(q, ex, '', {});}) - .then(function() { - // via the no-longer-existing binding - ch.publish(ex, '', Buffer.from(viabinding)); - // direct to the queue - ch.sendToQueue(q, Buffer.from(direct)); - return waitForMessages(q);}) - .then(function() {return ch.get(q)}) - .then(function(m) { - // the direct to queue message got through, the via-binding - // message (sent first) did not - assert.equal(direct, m.content.toString()); - }); -}); - -// To some extent this is now just testing semantics of the server, -// but we can at least try out a few settings, and consume. -chtest("consume via exchange-exchange binding", function(ch) { - var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2'; - var q = 'test.ex-ex-binding-q'; - var rk = 'test.routing.key', msg = randomString(); - return Promise.all([ - ch.assertExchange(ex1, 'direct', EX_OPTS), - ch.assertExchange(ex2, 'fanout', - {durable: false, internal: true}), - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q), - ch.bindExchange(ex2, ex1, rk, {}), - ch.bindQueue(q, ex2, '', {})]) - .then(function() { - return new Promise(function(resolve, reject) { - function delivery(m) { - if (m.content.toString() === msg) resolve(); - else reject(new Error("Wrong message")); - } - ch.consume(q, delivery, {noAck: true}) - .then(function() { - ch.publish(ex1, rk, Buffer.from(msg)); - }); + return Promise.all([ + ch.assertExchange(ex, 'fanout', EX_OPTS), + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q), + ch.bindQueue(q, ex, '', {})]) + .then(function() { + ch.publish(ex, '', Buffer.from(msg)); + return waitForMessages(q);}) + .then(function() { + return ch.get(q, {noAck: true});}) + .then(function(m) { + assert(m); + assert.equal(msg, m.content.toString()); }); - }); -}); - -// bind again, unbind, publish, get-empty -chtest("unbind exchange", function(ch) { - var source = 'test.unbind-ex-source'; - var dest = 'test.unbind-ex-dest'; - var q = 'test.unbind-ex-queue'; - var viabinding = randomString(); - var direct = randomString(); - - return Promise.all([ - ch.assertExchange(source, 'fanout', EX_OPTS), - ch.assertExchange(dest, 'fanout', EX_OPTS), - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q), - ch.bindExchange(dest, source, '', {}), - ch.bindQueue(q, dest, '', {})]) - .then(function() { - ch.publish(source, '', Buffer.from('foobar')); - return waitForMessages(q);}) - .then(function() { // message got through! - return ch.get(q, {noAck:true}) - .then(function(m) {assert(m);});}) - .then(function() { - return ch.unbindExchange(dest, source, '', {});}) - .then(function() { - // via the no-longer-existing binding - ch.publish(source, '', Buffer.from(viabinding)); - // direct to the queue - ch.sendToQueue(q, Buffer.from(direct)); - return waitForMessages(q);}) - .then(function() {return ch.get(q)}) - .then(function(m) { - // the direct to queue message got through, the via-binding - // message (sent first) did not - assert.equal(direct, m.content.toString()); - }); -}); + }); -// This is a bit convoluted. Sorry. -chtest("cancel consumer", function(ch) { - var q = 'test.consumer-cancel'; - var ctag; - var recv1 = new Promise(function (resolve, reject) { - Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q), - // My callback is 'resolve the promise in `arrived`' - ch.consume(q, resolve, {noAck:true}) - .then(function(ok) { - ctag = ok.consumerTag; - ch.sendToQueue(q, Buffer.from('foo')); - })]); + // send to queue, purge, get-empty + chtest("purge queue", function(ch) { + var q = 'test.purge-queue'; + return ch.assertQueue(q, {durable: false}) + .then(function() { + ch.sendToQueue(q, Buffer.from('foobar')); + return waitForMessages(q);}) + .then(function() { + ch.purgeQueue(q); + return ch.get(q, {noAck: true});}) + .then(function(m) { + assert(!m); // get-empty + }); }); - // A message should arrive because of the consume - return recv1.then(function() { - var recv2 = Promise.all([ - ch.cancel(ctag).then(function() { - return ch.sendToQueue(q, Buffer.from('bar')); - }), - // but check a message did arrive in the queue - waitForMessages(q)]) + // bind again, unbind, publish, get-empty + chtest("unbind queue", function(ch) { + var ex = 'test.unbind-queue-ex'; + var q = 'test.unbind-queue'; + var viabinding = randomString(); + var direct = randomString(); + + return Promise.all([ + ch.assertExchange(ex, 'fanout', EX_OPTS), + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q), + ch.bindQueue(q, ex, '', {})]) .then(function() { - return ch.get(q, {noAck:true}); - }) + ch.publish(ex, '', Buffer.from('foobar')); + return waitForMessages(q);}) + .then(function() { // message got through! + return ch.get(q, {noAck:true}) + .then(function(m) {assert(m);});}) + .then(function() { + return ch.unbindQueue(q, ex, '', {});}) + .then(function() { + // via the no-longer-existing binding + ch.publish(ex, '', Buffer.from(viabinding)); + // direct to the queue + ch.sendToQueue(q, Buffer.from(direct)); + return waitForMessages(q);}) + .then(function() {return ch.get(q)}) .then(function(m) { - // I'm going to reject it, because I flip succeed/fail - // just below - if (m.content.toString() === 'bar') { - throw new Error(); - } + // the direct to queue message got through, the via-binding + // message (sent first) did not + assert.equal(direct, m.content.toString()); }); + }); - return expectFail(recv2); + // To some extent this is now just testing semantics of the server, + // but we can at least try out a few settings, and consume. + chtest("consume via exchange-exchange binding", function(ch) { + var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2'; + var q = 'test.ex-ex-binding-q'; + var rk = 'test.routing.key', msg = randomString(); + return Promise.all([ + ch.assertExchange(ex1, 'direct', EX_OPTS), + ch.assertExchange(ex2, 'fanout', + {durable: false, internal: true}), + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q), + ch.bindExchange(ex2, ex1, rk, {}), + ch.bindQueue(q, ex2, '', {})]) + .then(function() { + return new Promise(function(resolve, reject) { + function delivery(m) { + if (m.content.toString() === msg) resolve(); + else reject(new Error("Wrong message")); + } + ch.consume(q, delivery, {noAck: true}) + .then(function() { + ch.publish(ex1, rk, Buffer.from(msg)); + }); + }); + }); }); -}); -chtest("cancelled consumer", function(ch) { - var q = 'test.cancelled-consumer'; - return new Promise(function(resolve, reject) { + // bind again, unbind, publish, get-empty + chtest("unbind exchange", function(ch) { + var source = 'test.unbind-ex-source'; + var dest = 'test.unbind-ex-dest'; + var q = 'test.unbind-ex-queue'; + var viabinding = randomString(); + var direct = randomString(); + return Promise.all([ - ch.assertQueue(q), + ch.assertExchange(source, 'fanout', EX_OPTS), + ch.assertExchange(dest, 'fanout', EX_OPTS), + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), - ch.consume(q, function(msg) { - if (msg === null) resolve(); - else reject(new Error('Message not expected')); - })]) + ch.bindExchange(dest, source, '', {}), + ch.bindQueue(q, dest, '', {})]) .then(function() { - return ch.deleteQueue(q); + ch.publish(source, '', Buffer.from('foobar')); + return waitForMessages(q);}) + .then(function() { // message got through! + return ch.get(q, {noAck:true}) + .then(function(m) {assert(m);});}) + .then(function() { + return ch.unbindExchange(dest, source, '', {});}) + .then(function() { + // via the no-longer-existing binding + ch.publish(source, '', Buffer.from(viabinding)); + // direct to the queue + ch.sendToQueue(q, Buffer.from(direct)); + return waitForMessages(q);}) + .then(function() {return ch.get(q)}) + .then(function(m) { + // the direct to queue message got through, the via-binding + // message (sent first) did not + assert.equal(direct, m.content.toString()); }); }); -}); -// ack, by default, removes a single message from the queue -chtest("ack", function(ch) { - var q = 'test.ack'; - var msg1 = randomString(), msg2 = randomString(); - - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), - ch.purgeQueue(q)]) - .then(function() { - ch.sendToQueue(q, Buffer.from(msg1)); - ch.sendToQueue(q, Buffer.from(msg2)); - return waitForMessages(q, 2); - }) - .then(function() { - return ch.get(q, {noAck: false}) - }) - .then(function(m) { - assert.equal(msg1, m.content.toString()); - ch.ack(m); - // %%% is there a race here? may depend on - // rabbitmq-sepcific semantics - return ch.get(q); - }) - .then(function(m) { - assert(m); - assert.equal(msg2, m.content.toString()); + // This is a bit convoluted. Sorry. + chtest("cancel consumer", function(ch) { + var q = 'test.consumer-cancel'; + var ctag; + var recv1 = new Promise(function (resolve, reject) { + Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q), + // My callback is 'resolve the promise in `arrived`' + ch.consume(q, resolve, {noAck:true}) + .then(function(ok) { + ctag = ok.consumerTag; + ch.sendToQueue(q, Buffer.from('foo')); + })]); }); -}); -// Nack, by default, puts a message back on the queue (where in the -// queue is up to the server) -chtest("nack", function(ch) { - var q = 'test.nack'; - var msg1 = randomString(); - - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) - .then(function() { - ch.sendToQueue(q, Buffer.from(msg1)); - return waitForMessages(q);}) - .then(function() { - return ch.get(q, {noAck: false})}) - .then(function(m) { - assert.equal(msg1, m.content.toString()); - ch.nack(m); - return waitForMessages(q);}) - .then(function() { - return ch.get(q);}) - .then(function(m) { - assert(m); - assert.equal(msg1, m.content.toString()); + // A message should arrive because of the consume + return recv1.then(function() { + var recv2 = Promise.all([ + ch.cancel(ctag).then(function() { + return ch.sendToQueue(q, Buffer.from('bar')); + }), + // but check a message did arrive in the queue + waitForMessages(q)]) + .then(function() { + return ch.get(q, {noAck:true}); + }) + .then(function(m) { + // I'm going to reject it, because I flip succeed/fail + // just below + if (m.content.toString() === 'bar') { + throw new Error(); + } + }); + + return expectFail(recv2); }); -}); + }); -// reject is a near-synonym for nack, the latter of which is not -// available in earlier RabbitMQ (or in AMQP proper). -chtest("reject", function(ch) { - var q = 'test.reject'; - var msg1 = randomString(); - - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) - .then(function() { - ch.sendToQueue(q, Buffer.from(msg1)); - return waitForMessages(q);}) - .then(function() { - return ch.get(q, {noAck: false})}) - .then(function(m) { - assert.equal(msg1, m.content.toString()); - ch.reject(m); - return waitForMessages(q);}) - .then(function() { - return ch.get(q);}) - .then(function(m) { - assert(m); - assert.equal(msg1, m.content.toString()); + chtest("cancelled consumer", function(ch) { + var q = 'test.cancelled-consumer'; + return new Promise(function(resolve, reject) { + return Promise.all([ + ch.assertQueue(q), + ch.purgeQueue(q), + ch.consume(q, function(msg) { + if (msg === null) resolve(); + else reject(new Error('Message not expected')); + })]) + .then(function() { + return ch.deleteQueue(q); + }); }); -}); + }); + + // ack, by default, removes a single message from the queue + chtest("ack", function(ch) { + var q = 'test.ack'; + var msg1 = randomString(), msg2 = randomString(); + + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), + ch.purgeQueue(q)]) + .then(function() { + ch.sendToQueue(q, Buffer.from(msg1)); + ch.sendToQueue(q, Buffer.from(msg2)); + return waitForMessages(q, 2); + }) + .then(function() { + return ch.get(q, {noAck: false}) + }) + .then(function(m) { + assert.equal(msg1, m.content.toString()); + ch.ack(m); + // %%% is there a race here? may depend on + // rabbitmq-sepcific semantics + return ch.get(q); + }) + .then(function(m) { + assert(m); + assert.equal(msg2, m.content.toString()); + }); + }); + + // Nack, by default, puts a message back on the queue (where in the + // queue is up to the server) + chtest("nack", function(ch) { + var q = 'test.nack'; + var msg1 = randomString(); + + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) + .then(function() { + ch.sendToQueue(q, Buffer.from(msg1)); + return waitForMessages(q);}) + .then(function() { + return ch.get(q, {noAck: false})}) + .then(function(m) { + assert.equal(msg1, m.content.toString()); + ch.nack(m); + return waitForMessages(q);}) + .then(function() { + return ch.get(q);}) + .then(function(m) { + assert(m); + assert.equal(msg1, m.content.toString()); + }); + }); -chtest("prefetch", function(ch) { - var q = 'test.prefetch'; - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), - ch.prefetch(1)]) - .then(function() { - ch.sendToQueue(q, Buffer.from('foobar')); - ch.sendToQueue(q, Buffer.from('foobar')); - return waitForMessages(q, 2); - }) - .then(function() { - return new Promise(function(resolve) { - var messageCount = 0; - function receive(msg) { - ch.ack(msg); - if (++messageCount > 1) { - resolve(messageCount); + // reject is a near-synonym for nack, the latter of which is not + // available in earlier RabbitMQ (or in AMQP proper). + chtest("reject", function(ch) { + var q = 'test.reject'; + var msg1 = randomString(); + + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) + .then(function() { + ch.sendToQueue(q, Buffer.from(msg1)); + return waitForMessages(q);}) + .then(function() { + return ch.get(q, {noAck: false})}) + .then(function(m) { + assert.equal(msg1, m.content.toString()); + ch.reject(m); + return waitForMessages(q);}) + .then(function() { + return ch.get(q);}) + .then(function(m) { + assert(m); + assert.equal(msg1, m.content.toString()); + }); + }); + + chtest("prefetch", function(ch) { + var q = 'test.prefetch'; + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q), + ch.prefetch(1)]) + .then(function() { + ch.sendToQueue(q, Buffer.from('foobar')); + ch.sendToQueue(q, Buffer.from('foobar')); + return waitForMessages(q, 2); + }) + .then(function() { + return new Promise(function(resolve) { + var messageCount = 0; + function receive(msg) { + ch.ack(msg); + if (++messageCount > 1) { + resolve(messageCount); + } } - } - return ch.consume(q, receive, {noAck: false}) + return ch.consume(q, receive, {noAck: false}) + }); + }) + .then(function(c) { + return assert.equal(2, c); }); - }) - .then(function(c) { - return assert.equal(2, c); - }); -}); + }); -chtest('close', function(ch) { - // Resolving promise guarantees - // channel is closed - return ch.close(); -}); + chtest('close', function(ch) { + // Resolving promise guarantees + // channel is closed + return ch.close(); + }); }); @@ -521,76 +531,76 @@ var confirmtest = channel_test.bind(null, 'createConfirmChannel'); suite("confirms", function() { -confirmtest('message is confirmed', function(ch) { - var q = 'test.confirm-message'; - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) - .then(function() { - return ch.sendToQueue(q, Buffer.from('bleep')); - }); -}); - -// Usually one can provoke the server into confirming more than one -// message in an ack by simply sending a few messages in quick -// succession; a bit unscientific I know. Luckily we can eavesdrop on -// the acknowledgements coming through to see if we really did get a -// multi-ack. -confirmtest('multiple confirms', function(ch) { - var q = 'test.multiple-confirms'; - return Promise.all([ - ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) - .then(function() { - var multipleRainbows = false; - ch.on('ack', function(a) { - if (a.multiple) multipleRainbows = true; + confirmtest('message is confirmed', function(ch) { + var q = 'test.confirm-message'; + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) + .then(function() { + return ch.sendToQueue(q, Buffer.from('bleep')); }); + }); - function prod(num) { - var cs = []; + // Usually one can provoke the server into confirming more than one + // message in an ack by simply sending a few messages in quick + // succession; a bit unscientific I know. Luckily we can eavesdrop on + // the acknowledgements coming through to see if we really did get a + // multi-ack. + confirmtest('multiple confirms', function(ch) { + var q = 'test.multiple-confirms'; + return Promise.all([ + ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)]) + .then(function() { + var multipleRainbows = false; + ch.on('ack', function(a) { + if (a.multiple) multipleRainbows = true; + }); - function sendAndPushPromise() { - var conf = promisify(function(cb) { - return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb); - })(); - cs.push(conf); - } + function prod(num) { + var cs = []; - for (var i=0; i < num; i++) sendAndPushPromise(); - - return Promise.all(cs).then(function() { - if (multipleRainbows) return true; - else if (num > 500) throw new Error( - "Couldn't provoke the server" + - " into multi-acking with " + num + - " messages; giving up"); - else { - //console.warn("Failed with " + num + "; trying " + num * 2); - return prod(num * 2); + function sendAndPushPromise() { + var conf = promisify(function(cb) { + return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb); + })(); + cs.push(conf); } - }); - } - return prod(5); - }); -}); -confirmtest('wait for confirms', function(ch) { - for (var i=0; i < 1000; i++) { - ch.publish('', '', Buffer.from('foobar'), {}); - } - return ch.waitForConfirms(); -}) - -confirmtest('works when channel is closed', function(ch) { - for (var i=0; i < 1000; i++) { - ch.publish('', '', Buffer.from('foobar'), {}); - } - return ch.close().then(function () { - return ch.waitForConfirms() - }).then(function () { - assert.strictEqual(true, false, 'Wait should have failed.') - }, function (e) { - assert.strictEqual(e.message, 'channel closed') + for (var i=0; i < num; i++) sendAndPushPromise(); + + return Promise.all(cs).then(function() { + if (multipleRainbows) return true; + else if (num > 500) throw new Error( + "Couldn't provoke the server" + + " into multi-acking with " + num + + " messages; giving up"); + else { + //console.warn("Failed with " + num + "; trying " + num * 2); + return prod(num * 2); + } + }); + } + return prod(5); + }); + }); + + confirmtest('wait for confirms', function(ch) { + for (var i=0; i < 1000; i++) { + ch.publish('', '', Buffer.from('foobar'), {}); + } + return ch.waitForConfirms(); + }) + + confirmtest('works when channel is closed', function(ch) { + for (var i=0; i < 1000; i++) { + ch.publish('', '', Buffer.from('foobar'), {}); + } + return ch.close().then(function () { + return ch.waitForConfirms() + }).then(function () { + assert.strictEqual(true, false, 'Wait should have failed.') + }, function (e) { + assert.strictEqual(e.message, 'channel closed') + }); }); -}); }); From 4d259514ba5b894ab235e531595bd80faba761d2 Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Thu, 22 Feb 2024 08:39:01 +0000 Subject: [PATCH 2/3] Automatically remove update-secret-ok event handler --- lib/connection.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/connection.js b/lib/connection.js index 3586fdfd..1c58ac5f 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -366,7 +366,7 @@ class Connection extends EventEmitter { newSecret, reason }); - this.on('update-secret-ok', cb); + this.once('update-secret-ok', cb); } // === From c4da58b19e41ce5db7488755c11ab20324e0f23c Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Thu, 22 Feb 2024 08:49:32 +0000 Subject: [PATCH 3/3] Make quotes in updateSecret consistent with other tests --- test/channel_api.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/channel_api.js b/test/channel_api.js index 77ed496a..f3a1691d 100644 --- a/test/channel_api.js +++ b/test/channel_api.js @@ -59,10 +59,10 @@ suite("connect", function() { }); -suite('updateSecret', function() { - test('updateSecret', function(done) { +suite("updateSecret", function() { + test("updateSecret", function(done) { connect().then(function(c) { - c.updateSecret(Buffer.from('new secret'), 'no reason') + c.updateSecret(Buffer.from("new secret"), "no reason") .then(succeed(done), fail(done)) .finally(function() { c.close(); }); });