From 6ba50b0193a042b61abd5e0ae57110c015e54d75 Mon Sep 17 00:00:00 2001 From: Hyeonjae Park Date: Sun, 20 Oct 2019 21:22:21 +0900 Subject: [PATCH 1/3] Add named queue binding --- lib/src/client/impl/exchange_impl.dart | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/lib/src/client/impl/exchange_impl.dart b/lib/src/client/impl/exchange_impl.dart index 688804d..8425612 100644 --- a/lib/src/client/impl/exchange_impl.dart +++ b/lib/src/client/impl/exchange_impl.dart @@ -63,4 +63,24 @@ class _ExchangeImpl implements Exchange { } return queue.consume(consumerTag: consumerTag, noAck: noAck); } + + Future bindQueueConsumer(String queueName, List routingKeys, + {String consumerTag, bool noAck = true}) async { + // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified + if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) && + (routingKeys == null || routingKeys.isEmpty)) { + routingKeys = [""]; + } + + if ((routingKeys == null || routingKeys.isEmpty)) { + throw ArgumentError( + "One or more routing keys needs to be specified for this exchange type"); + } + + Queue queue = await channel.queue(queueName); + for (String routingKey in routingKeys) { + await queue.bind(this, routingKey); + } + return queue.consume(consumerTag: consumerTag, noAck: noAck); + } } From 67141fcea6006603a53a41b0fa6cf76b382316e2 Mon Sep 17 00:00:00 2001 From: Hyeonjae Park Date: Wed, 6 Nov 2019 14:51:20 +0900 Subject: [PATCH 2/3] Add testcase and API docs --- API.md | 1 + lib/src/client/exchange.dart | 11 +++++++++++ test/lib/exchange_test.dart | 11 +++++++++++ 3 files changed, 23 insertions(+) diff --git a/API.md b/API.md index f9f6f4b..7015e51 100644 --- a/API.md +++ b/API.md @@ -86,6 +86,7 @@ The following table summarizes the methods available to an AMQP exchange declare | delete() | Delete the exchange. | publish() | Publish message using a routing key | bindPrivateQueueConsumer() | Convenience method that allocates a private [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages. +| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages. ## Queues diff --git a/lib/src/client/exchange.dart b/lib/src/client/exchange.dart index 1406bc1..987079a 100644 --- a/lib/src/client/exchange.dart +++ b/lib/src/client/exchange.dart @@ -44,4 +44,15 @@ abstract class Exchange { /// messages or not. Future bindPrivateQueueConsumer(List routingKeys, {String consumerTag, bool noAck = true}); + + /// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys], + /// allocate a [Consumer] and return a [Future]. + /// + /// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified, + /// the server will assign a random tag to this consumer. Consumer tags are local to the current channel. + /// + /// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming + /// messages or not. + Future bindQueueConsumer(String queueName, List routingKeys, + {String consumerTag, bool noAck = true}); } diff --git a/test/lib/exchange_test.dart b/test/lib/exchange_test.dart index 205c1fd..af2e4da 100644 --- a/test/lib/exchange_test.dart +++ b/test/lib/exchange_test.dart @@ -297,6 +297,17 @@ main({bool enableLogger = true}) { ex is ArgumentError && ex.message == "Exchange cannot be null")); }); + + test("declare exchange and bind named queue consumer", () async { + Channel channel = await client.channel(); + Exchange exchange = + await channel.exchange("ex_test_1", ExchangeType.DIRECT); + Consumer consumer = await exchange.bindQueueConsumer("my_test_queue", ["test"]); + expect(consumer.channel, const TypeMatcher()); + expect(consumer.queue, const TypeMatcher()); + expect(consumer.tag, isNotEmpty); + expect(consumer.queue.name, equals("my_test_queue")); + }); }); }); } From 4ad2bfdef04e1b8d4060e436773a3c65b033355f Mon Sep 17 00:00:00 2001 From: Hyeonjae Park Date: Wed, 6 Nov 2019 14:56:25 +0900 Subject: [PATCH 3/3] Apply dartfmt --- lib/src/client/exchange.dart | 2 +- test/lib/exchange_test.dart | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/src/client/exchange.dart b/lib/src/client/exchange.dart index 987079a..c9c52ea 100644 --- a/lib/src/client/exchange.dart +++ b/lib/src/client/exchange.dart @@ -48,7 +48,7 @@ abstract class Exchange { /// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys], /// allocate a [Consumer] and return a [Future]. /// - /// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified, + /// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified, /// the server will assign a random tag to this consumer. Consumer tags are local to the current channel. /// /// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming diff --git a/test/lib/exchange_test.dart b/test/lib/exchange_test.dart index af2e4da..5d157ee 100644 --- a/test/lib/exchange_test.dart +++ b/test/lib/exchange_test.dart @@ -302,7 +302,8 @@ main({bool enableLogger = true}) { Channel channel = await client.channel(); Exchange exchange = await channel.exchange("ex_test_1", ExchangeType.DIRECT); - Consumer consumer = await exchange.bindQueueConsumer("my_test_queue", ["test"]); + Consumer consumer = + await exchange.bindQueueConsumer("my_test_queue", ["test"]); expect(consumer.channel, const TypeMatcher()); expect(consumer.queue, const TypeMatcher()); expect(consumer.tag, isNotEmpty);