diff --git a/src/main/java/reactor/rabbitmq/ConsumeOptions.java b/src/main/java/reactor/rabbitmq/ConsumeOptions.java index ca8b183..8a8351a 100644 --- a/src/main/java/reactor/rabbitmq/ConsumeOptions.java +++ b/src/main/java/reactor/rabbitmq/ConsumeOptions.java @@ -18,6 +18,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Delivery; +import java.util.Collections; +import java.util.Map; import reactor.core.publisher.FluxSink; import java.time.Duration; @@ -60,6 +62,11 @@ public class ConsumeOptions { private Consumer channelCallback = ch -> { }; + /** + * Arguments for the call to basic.consume. + */ + private Map arguments = Collections.emptyMap(); + public int getQos() { return qos; } @@ -129,4 +136,13 @@ public ConsumeOptions channelCallback(Consumer channelCallback) { public Consumer getChannelCallback() { return channelCallback; } + + public ConsumeOptions arguments(Map arguments) { + this.arguments = arguments; + return this; + } + + public Map getArguments() { + return arguments; + } } diff --git a/src/main/java/reactor/rabbitmq/Receiver.java b/src/main/java/reactor/rabbitmq/Receiver.java index a23e7d4..79a9a43 100644 --- a/src/main/java/reactor/rabbitmq/Receiver.java +++ b/src/main/java/reactor/rabbitmq/Receiver.java @@ -128,7 +128,15 @@ public Flux consumeNoAck(final String queue, ConsumeOptions options) { completeOnChannelShutdown(channel, emitter); - final String consumerTag = channel.basicConsume(queue, true, options.getConsumerTag(), deliverCallback, cancelCallback); + final String consumerTag = channel.basicConsume( + queue, + true, // auto-ack + options.getConsumerTag(), + false, // noLocal (not supported by RabbitMQ) + false, // not exclusive + options.getArguments(), + deliverCallback, + cancelCallback); AtomicBoolean cancelled = new AtomicBoolean(false); LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue); emitter.onDispose(() -> { @@ -217,7 +225,15 @@ public Flux consumeManualAck(final String queue, Consume completeOnChannelShutdown(channel, emitter); - final String consumerTag = channel.basicConsume(queue, false, options.getConsumerTag(), deliverCallback, cancelCallback); + final String consumerTag = channel.basicConsume( + queue, + false, // no auto-ack + options.getConsumerTag(), + false, // noLocal (not supported by RabbitMQ) + false, // not exclusive + options.getArguments(), + deliverCallback, + cancelCallback); AtomicBoolean cancelled = new AtomicBoolean(false); LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue); emitter.onDispose(() -> { diff --git a/src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java b/src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java index d9507d7..aefb0a9 100644 --- a/src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java +++ b/src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java @@ -170,9 +170,16 @@ void consumeAutoAckRetryOnAck() throws Exception { AtomicReference deliverCallbackAtomicReference = new AtomicReference<>(); when(mockChannel.basicConsume( - anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class) + anyString(), // queue + anyBoolean(), // auto-ack + anyString(), // consumer tag + anyBoolean(), // noLocal (always false) + anyBoolean(), // exclusive (always false) + anyMap(), // arguments + any(DeliverCallback.class), + any(CancelCallback.class) )).thenAnswer(answer -> { - deliverCallbackAtomicReference.set(answer.getArgument(3)); + deliverCallbackAtomicReference.set(answer.getArgument(6)); consumerRegisteredLatch.countDown(); return "ctag"; }); @@ -228,9 +235,16 @@ void consumeManualAckRetryOnAck() throws Exception { AtomicReference deliverCallbackAtomicReference = new AtomicReference<>(); when(mockChannel.basicConsume( - anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class) + anyString(), // queue + anyBoolean(), // auto-ack + anyString(), // consumer tag + anyBoolean(), // noLocal (always false) + anyBoolean(), // exclusive (always false) + anyMap(), // arguments + any(DeliverCallback.class), + any(CancelCallback.class) )).thenAnswer(answer -> { - deliverCallbackAtomicReference.set(answer.getArgument(3)); + deliverCallbackAtomicReference.set(answer.getArgument(6)); consumerRegisteredLatch.countDown(); return "ctag"; });