Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add named queue binding #30

Merged
merged 3 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ The following table summarizes the methods available to an AMQP exchange declare
| delete() | Delete the exchange.
| publish() | Publish message using a routing key
| bindPrivateQueueConsumer() | Convenience method that allocates a private [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.
| bindQueueConsumer() | Convenience method that allocates a named [queue](#queues), binds it to the exchange via a routing key and returns a [consumer](#consumers) for processing messages.

## Queues

Expand Down
11 changes: 11 additions & 0 deletions lib/src/client/exchange.dart
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,15 @@ abstract class Exchange {
/// messages or not.
Future<Consumer> bindPrivateQueueConsumer(List<String> routingKeys,
{String consumerTag, bool noAck = true});

/// Allocate a named [Queue], bind it to this exchange using the supplied [routingKeys],
/// allocate a [Consumer] and return a [Future<Consumer>].
///
/// You may specify a queue name and a [consumerTag] to label this consumer. If left unspecified,
/// the server will assign a random tag to this consumer. Consumer tags are local to the current channel.
///
/// The [noAck] flag will notify the server whether the consumer is expected to acknowledge incoming
/// messages or not.
Future<Consumer> bindQueueConsumer(String queueName, List<String> routingKeys,
{String consumerTag, bool noAck = true});
}
20 changes: 20 additions & 0 deletions lib/src/client/impl/exchange_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,24 @@ class _ExchangeImpl implements Exchange {
}
return queue.consume(consumerTag: consumerTag, noAck: noAck);
}

Future<Consumer> bindQueueConsumer(String queueName, List<String> routingKeys,
hyeonjae marked this conversation as resolved.
Show resolved Hide resolved
{String consumerTag, bool noAck = true}) async {
// Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified
if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) &&
(routingKeys == null || routingKeys.isEmpty)) {
routingKeys = [""];
}

if ((routingKeys == null || routingKeys.isEmpty)) {
throw ArgumentError(
"One or more routing keys needs to be specified for this exchange type");
}

Queue queue = await channel.queue(queueName);
for (String routingKey in routingKeys) {
await queue.bind(this, routingKey);
}
return queue.consume(consumerTag: consumerTag, noAck: noAck);
}
}
12 changes: 12 additions & 0 deletions test/lib/exchange_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,18 @@ main({bool enableLogger = true}) {
ex is ArgumentError &&
ex.message == "Exchange cannot be null"));
});

test("declare exchange and bind named queue consumer", () async {
Channel channel = await client.channel();
Exchange exchange =
await channel.exchange("ex_test_1", ExchangeType.DIRECT);
Consumer consumer =
await exchange.bindQueueConsumer("my_test_queue", ["test"]);
expect(consumer.channel, const TypeMatcher<Channel>());
expect(consumer.queue, const TypeMatcher<Queue>());
expect(consumer.tag, isNotEmpty);
expect(consumer.queue.name, equals("my_test_queue"));
});
});
});
}