diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index 41cf8ff..09a98a3 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -19,12 +19,16 @@ abstract class Channel { /// /// The [autoDelete] flag will notify the server that the queue should be deleted when no more connections /// are using it. + /// + /// The [declare] flag can be set to false to skip the queue declaration step + /// for clients with read-only access to the broker. Future queue(String name, {bool passive = false, bool durable = false, bool exclusive = false, bool autoDelete = false, bool noWait = false, + bool declare = true, Map arguments}); /// A convenience method for allocating private queues. The client will allocate diff --git a/lib/src/client/impl/channel_impl.dart b/lib/src/client/impl/channel_impl.dart index 96e20e7..be33d12 100644 --- a/lib/src/client/impl/channel_impl.dart +++ b/lib/src/client/impl/channel_impl.dart @@ -468,6 +468,7 @@ class _ChannelImpl implements Channel { bool exclusive = false, bool autoDelete = false, bool noWait = false, + bool declare = true, Map? arguments}) { QueueDeclare queueRequest = QueueDeclare() ..reserved_1 = 0 @@ -480,6 +481,12 @@ class _ChannelImpl implements Channel { ..arguments = arguments; Completer opCompleter = Completer(); + + if (!declare) { + opCompleter.complete(_QueueImpl(this, name)); + return opCompleter.future; + } + writeMessage(queueRequest, completer: opCompleter, futurePayload: _QueueImpl(this, name), diff --git a/test/lib/queue_test.dart b/test/lib/queue_test.dart index 22bba0c..ffb076f 100644 --- a/test/lib/queue_test.dart +++ b/test/lib/queue_test.dart @@ -2,6 +2,7 @@ library dart_amqp.test.queues; import "dart:async"; +import 'package:dart_amqp/dart_amqp.dart'; import "package:test/test.dart"; import "package:dart_amqp/src/client.dart"; @@ -106,6 +107,33 @@ main({bool enableLogger = true}) { return testCompleter.future; }); + test("queue message delivery when consumer has RO access", () async { + Completer testCompleter = Completer(); + + // Use the second client to define the queue in advance and publish a + // message to it + Channel channel2 = await client2.channel(); + Queue target = await channel2.queue("test_ro"); + target.publish("Test payload"); + + // Pretend we are a RO consumer that cannot declare the queue but should + // still be able to consume from it. + Channel channel = await client.channel(); + Queue testQueue = await channel.queue("test_ro", declare: false); + Consumer consumer = await testQueue.consume(); + + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + + consumer.listen(expectAsync1((AmqpMessage message) { + expect(message.payloadAsString, equals("Test payload")); + testCompleter.complete(); + })); + + return testCompleter.future; + }); + test("queue message delivery", () async { Completer testCompleter = Completer();