Skip to content

Commit

Permalink
Add arguments in ConsumeOptions
Browse files Browse the repository at this point in the history
For basic.consume. Allows to specify arguments e.g. x-stream-offset.

Fixes #165
  • Loading branch information
acogoluegnes committed Oct 28, 2021
1 parent 4c560fb commit a515660
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 6 deletions.
16 changes: 16 additions & 0 deletions src/main/java/reactor/rabbitmq/ConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import java.util.Collections;
import java.util.Map;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
Expand Down Expand Up @@ -60,6 +62,11 @@ public class ConsumeOptions {
private Consumer<Channel> channelCallback = ch -> {
};

/**
* Arguments for the call to <code>basic.consume</code>.
*/
private Map<String, Object> arguments = Collections.emptyMap();

public int getQos() {
return qos;
}
Expand Down Expand Up @@ -129,4 +136,13 @@ public ConsumeOptions channelCallback(Consumer<Channel> channelCallback) {
public Consumer<Channel> getChannelCallback() {
return channelCallback;
}

public ConsumeOptions arguments(Map<String, Object> arguments) {
this.arguments = arguments;
return this;
}

public Map<String, Object> getArguments() {
return arguments;
}
}
20 changes: 18 additions & 2 deletions src/main/java/reactor/rabbitmq/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,15 @@ public Flux<Delivery> consumeNoAck(final String queue, ConsumeOptions options) {

completeOnChannelShutdown(channel, emitter);

final String consumerTag = channel.basicConsume(queue, true, options.getConsumerTag(), deliverCallback, cancelCallback);
final String consumerTag = channel.basicConsume(
queue,
true, // auto-ack
options.getConsumerTag(),
false, // noLocal (not supported by RabbitMQ)
false, // not exclusive
options.getArguments(),
deliverCallback,
cancelCallback);
AtomicBoolean cancelled = new AtomicBoolean(false);
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
emitter.onDispose(() -> {
Expand Down Expand Up @@ -217,7 +225,15 @@ public Flux<AcknowledgableDelivery> consumeManualAck(final String queue, Consume

completeOnChannelShutdown(channel, emitter);

final String consumerTag = channel.basicConsume(queue, false, options.getConsumerTag(), deliverCallback, cancelCallback);
final String consumerTag = channel.basicConsume(
queue,
false, // no auto-ack
options.getConsumerTag(),
false, // noLocal (not supported by RabbitMQ)
false, // not exclusive
options.getArguments(),
deliverCallback,
cancelCallback);
AtomicBoolean cancelled = new AtomicBoolean(false);
LOGGER.info("Consumer {} consuming from {} has been registered", consumerTag, queue);
emitter.onDispose(() -> {
Expand Down
22 changes: 18 additions & 4 deletions src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,16 @@ void consumeAutoAckRetryOnAck() throws Exception {
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();

when(mockChannel.basicConsume(
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
anyString(), // queue
anyBoolean(), // auto-ack
anyString(), // consumer tag
anyBoolean(), // noLocal (always false)
anyBoolean(), // exclusive (always false)
anyMap(), // arguments
any(DeliverCallback.class),
any(CancelCallback.class)
)).thenAnswer(answer -> {
deliverCallbackAtomicReference.set(answer.getArgument(3));
deliverCallbackAtomicReference.set(answer.getArgument(6));
consumerRegisteredLatch.countDown();
return "ctag";
});
Expand Down Expand Up @@ -228,9 +235,16 @@ void consumeManualAckRetryOnAck() throws Exception {
AtomicReference<DeliverCallback> deliverCallbackAtomicReference = new AtomicReference<>();

when(mockChannel.basicConsume(
anyString(), anyBoolean(), anyString(), any(DeliverCallback.class), any(CancelCallback.class)
anyString(), // queue
anyBoolean(), // auto-ack
anyString(), // consumer tag
anyBoolean(), // noLocal (always false)
anyBoolean(), // exclusive (always false)
anyMap(), // arguments
any(DeliverCallback.class),
any(CancelCallback.class)
)).thenAnswer(answer -> {
deliverCallbackAtomicReference.set(answer.getArgument(3));
deliverCallbackAtomicReference.set(answer.getArgument(6));
consumerRegisteredLatch.countDown();
return "ctag";
});
Expand Down

0 comments on commit a515660

Please sign in to comment.