diff --git a/API.md b/API.md index f9f6f4b..7015e51 100644 --- a/API.md +++ b/API.md @@ -86,6 +86,7 @@ The following table summarizes the methods available to an AMQP exchange declare | delete() | Delete the exchange. | publish() | Publish message using a routing key | bindPrivateQueueConsumer() | Convenience method that allocates a private [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages. +| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages. ## Queues diff --git a/lib/src/client/exchange.dart b/lib/src/client/exchange.dart index 1406bc1..c9c52ea 100644 --- a/lib/src/client/exchange.dart +++ b/lib/src/client/exchange.dart @@ -44,4 +44,15 @@ abstract class Exchange { /// messages or not. Future bindPrivateQueueConsumer(List routingKeys, {String consumerTag, bool noAck = true}); + + /// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys], + /// allocate a [Consumer] and return a [Future]. + /// + /// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified, + /// the server will assign a random tag to this consumer. Consumer tags are local to the current channel. + /// + /// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming + /// messages or not. + Future bindQueueConsumer(String queueName, List routingKeys, + {String consumerTag, bool noAck = true}); } diff --git a/lib/src/client/impl/exchange_impl.dart b/lib/src/client/impl/exchange_impl.dart index 688804d..8425612 100644 --- a/lib/src/client/impl/exchange_impl.dart +++ b/lib/src/client/impl/exchange_impl.dart @@ -63,4 +63,24 @@ class _ExchangeImpl implements Exchange { } return queue.consume(consumerTag: consumerTag, noAck: noAck); } + + Future bindQueueConsumer(String queueName, List routingKeys, + {String consumerTag, bool noAck = true}) async { + // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified + if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) && + (routingKeys == null || routingKeys.isEmpty)) { + routingKeys = [""]; + } + + if ((routingKeys == null || routingKeys.isEmpty)) { + throw ArgumentError( + "One or more routing keys needs to be specified for this exchange type"); + } + + Queue queue = await channel.queue(queueName); + for (String routingKey in routingKeys) { + await queue.bind(this, routingKey); + } + return queue.consume(consumerTag: consumerTag, noAck: noAck); + } } diff --git a/test/lib/exchange_test.dart b/test/lib/exchange_test.dart index 205c1fd..5d157ee 100644 --- a/test/lib/exchange_test.dart +++ b/test/lib/exchange_test.dart @@ -297,6 +297,18 @@ main({bool enableLogger = true}) { ex is ArgumentError && ex.message == "Exchange cannot be null")); }); + + test("declare exchange and bind named queue consumer", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = + await exchange.bindQueueConsumer("my_test_queue", ["test"]); + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + expect(consumer.queue.name, equals("my_test_queue")); + }); }); }); }