diff --git a/lib/src/client/impl/exchange_impl.dart b/lib/src/client/impl/exchange_impl.dart index a4e24dd..e2f5594 100644 --- a/lib/src/client/impl/exchange_impl.dart +++ b/lib/src/client/impl/exchange_impl.dart @@ -22,7 +22,7 @@ class _ExchangeImpl implements Exchange { } void publish(Object message, String routingKey, { MessageProperties properties, bool mandatory : false, bool immediate : false}) { - if (!type.isCustom && type != ExchangeType.FANOUT && (routingKey == null || routingKey.isEmpty)) { + if (!type.isCustom && type != ExchangeType.FANOUT && type != ExchangeType.HEADERS && (routingKey == null || routingKey.isEmpty)) { throw new ArgumentError("A valid routing key needs to be specified"); } @@ -37,8 +37,8 @@ class _ExchangeImpl implements Exchange { } Future bindPrivateQueueConsumer(List routingKeys, {String consumerTag, bool noAck: true}) { - // Fanout exchanges do not need to specify any keys. Use the default one if none is specified - if (type == ExchangeType.FANOUT && (routingKeys == null || routingKeys.isEmpty)) { + // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified + if ((type == ExchangeType.FANOUT || type == ExchangeType.HEADERS) && (routingKeys == null || routingKeys.isEmpty)) { routingKeys = [""]; } diff --git a/lib/src/client/impl/queue_impl.dart b/lib/src/client/impl/queue_impl.dart index bd522ce..81d12a1 100644 --- a/lib/src/client/impl/queue_impl.dart +++ b/lib/src/client/impl/queue_impl.dart @@ -42,9 +42,9 @@ class _QueueImpl implements Queue { if (exchange == null) { throw new ArgumentError("Exchange cannot be null"); } - // Fanout exchanges do not need to specify any keys. Use the default one if none is specified + // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified if (routingKey == null || routingKey.isEmpty) { - if (exchange.type == ExchangeType.FANOUT) { + if (exchange.type == ExchangeType.FANOUT || exchange.type == ExchangeType.HEADERS) { routingKey = ""; } else { throw new ArgumentError("A routing key needs to be specified to bind to this exchange type"); @@ -56,7 +56,8 @@ class _QueueImpl implements Queue { ..queue = name ..exchange = exchange.name ..routingKey = routingKey - ..noWait = noWait; + ..noWait = noWait + ..arguments = arguments; Completer completer = new Completer(); channel.writeMessage(bindRequest, completer : completer, futurePayload : this); @@ -67,9 +68,9 @@ class _QueueImpl implements Queue { if (exchange == null) { throw new ArgumentError("Exchange cannot be null"); } - // Fanout exchanges do not need to specify any keys. Use the default one if none is specified + // Fanout and headers exchanges do not need to specify any keys. Use the default one if none is specified if (routingKey == null || routingKey.isEmpty) { - if (exchange.type == ExchangeType.FANOUT) { + if (exchange.type == ExchangeType.FANOUT || exchange.type == ExchangeType.HEADERS) { routingKey = ""; } else { throw new ArgumentError("A routing key needs to be specified to unbind from this exchange type"); @@ -80,7 +81,8 @@ class _QueueImpl implements Queue { ..reserved_1 = 0 ..queue = name ..exchange = exchange.name - ..routingKey = routingKey; + ..routingKey = routingKey + ..arguments = arguments; Completer completer = new Completer(); channel.writeMessage(unbindRequest, completer : completer, futurePayload : this); diff --git a/lib/src/client/queue.dart b/lib/src/client/queue.dart index c0ea81b..35e51d5 100644 --- a/lib/src/client/queue.dart +++ b/lib/src/client/queue.dart @@ -37,7 +37,7 @@ abstract class Queue { /** * Bind this queue to [exchange] using [routingKey] and return a [Future] to the bound queue. * - * The [routingKey] parameter cannot be empty or null unless [exchange] is of type [ExchangeType.FANOUT]. + * The [routingKey] parameter cannot be empty or null unless [exchange] is of type [ExchangeType.FANOUT] or [ExchangeType.HEADERS]. * For any other [exchange] type, passing an empty or null [routingKey] will cause an [ArgumentError] * to be thrown. */ @@ -46,7 +46,7 @@ abstract class Queue { /** * Unbind this queue from [exchange] with [routingKey] and return a [Future] to the unbound queue. * - * The [routingKey] parameter cannot be empty or null unless [exchange] is of type [ExchangeType.FANOUT]. + * The [routingKey] parameter cannot be empty or null unless [exchange] is of type [ExchangeType.FANOUT] or [ExchangeType.HEADERS]. * For any other [exchange] type, passing an empty or null [routingKey] will cause an [ArgumentError] * to be thrown. */