Skip to content

Commit

Permalink
Merge pull request #30 from hyeonjae/master
Browse files Browse the repository at this point in the history
Add named queue binding
  • Loading branch information
achilleasa authored Nov 6, 2019
2 parents 5362425 + 4ad2bfd commit 0ddc3ca
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 0 deletions.
1 change: 1 addition & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The following table summarizes the methods available to an AMQP exchange declare
| delete() | Delete the exchange.
| publish() | Publish message using a routing key
| bindPrivateQueueConsumer() | Convenience method that allocates a private [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.
| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.

## Queues

Expand Down
11 changes: 11 additions & 0 deletions lib/src/client/exchange.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,15 @@ abstract class Exchange {
/// messages or not.
Future<Consumer> bindPrivateQueueConsumer(List<String> routingKeys,
{String consumerTag, bool noAck = true});

/// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys],
/// allocate a [Consumer] and return a [Future<Consumer>].
///
/// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified,
/// the server will assign a random tag to this consumer. Consumer tags are local to the current channel.
///
/// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming
/// messages or not.
Future<Consumer> bindQueueConsumer(String queueName, List<String> routingKeys,
{String consumerTag, bool noAck = true});
}
20 changes: 20 additions & 0 deletions lib/src/client/impl/exchange_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,24 @@ class _ExchangeImpl implements Exchange {
}
return queue.consume(consumerTag: consumerTag, noAck: noAck);
}

Future<Consumer> bindQueueConsumer(String queueName, List<String> routingKeys,
{String consumerTag, bool noAck = true}) async {
// Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified
if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) &&
(routingKeys == null || routingKeys.isEmpty)) {
routingKeys = [""];
}

if ((routingKeys == null || routingKeys.isEmpty)) {
throw ArgumentError(
"One or more routing keys needs to be specified for this exchange type");
}

Queue queue = await channel.queue(queueName);
for (String routingKey in routingKeys) {
await queue.bind(this, routingKey);
}
return queue.consume(consumerTag: consumerTag, noAck: noAck);
}
}
12 changes: 12 additions & 0 deletions test/lib/exchange_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ main({bool enableLogger = true}) {
ex is ArgumentError &&
ex.message == "Exchange cannot be null"));
});

test("declare exchange and bind named queue consumer", () async {
Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("ex_test_1", ExchangeType.DIRECT);
Consumer consumer =
await exchange.bindQueueConsumer("my_test_queue", ["test"]);
expect(consumer.channel, const TypeMatcher<Channel>());
expect(consumer.queue, const TypeMatcher<Queue>());
expect(consumer.tag, isNotEmpty);
expect(consumer.queue.name, equals("my_test_queue"));
});
});
});
}

0 comments on commit 0ddc3ca

Please sign in to comment.