Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-174 Document RPC limitations #175

Merged
merged 1 commit into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ include::{test-examples}/ApiGuideSender.java[tag=static-import]
include::{test-examples}/ApiGuideSender.java[tag=resource-deletion,indent=0]
-------

[NOTE]
====
Warning: These methods relies on RPCs. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
is being used to prevent concurrent RPCs, making this publisher potentially blocking.
====

==== Reliable publishing with publisher confirms

`Sender` offers also the `sendWithPublishConfirms` method to send
Expand Down
145 changes: 144 additions & 1 deletion src/main/java/reactor/rabbitmq/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ public RpcClient rpcClient(String exchange, String routingKey, Supplier<String>
/**
* Declare a queue following the specification.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the specification of the queue
* @return a mono wrapping the result of the declaration
* @see QueueSpecification
Expand All @@ -340,6 +343,9 @@ public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification) {
/**
* Declare a queue following the specification and the resource management options.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the specification of the queue
* @param options options for resource management
* @return a mono wrapping the result of the declaration
Expand All @@ -353,6 +359,9 @@ public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification specification, @Nul
/**
* Declare a queue following the specification.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the specification of the queue
* @return a mono wrapping the result of the declaration
* @see QueueSpecification
Expand All @@ -364,6 +373,9 @@ public Mono<AMQP.Queue.DeclareOk> declareQueue(QueueSpecification specification)
/**
* Declare a queue following the specification and the resource management options.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the specification of the queue
* @param options options for resource management
* @return a mono wrapping the result of the declaration
Expand Down Expand Up @@ -409,26 +421,62 @@ private Mono<? extends Channel> getChannelMonoForResourceManagement(ResourceMana
options.getChannelMono() : this.resourceManagementChannelMono;
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification) {
return this.delete(specification, false, false);
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, @Nullable ResourceManagementOptions options) {
return this.delete(specification, false, false, options);
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
return this.deleteQueue(specification, ifUnused, ifEmpty);
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> delete(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, @Nullable ResourceManagementOptions options) {
return this.deleteQueue(specification, ifUnused, ifEmpty, options);
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty) {
return this.deleteQueue(specification, ifUnused, ifEmpty, null);
}

/**
* Delete a queue.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, boolean ifUnused, boolean ifEmpty, @Nullable ResourceManagementOptions options) {
Mono<? extends Channel> channelMono = getChannelMonoForResourceManagement(options);
AMQP.Queue.Delete delete = new AMQImpl.Queue.Delete.Builder()
Expand All @@ -448,18 +496,41 @@ public Mono<AMQP.Queue.DeleteOk> deleteQueue(QueueSpecification specification, b
.publishOn(resourceManagementScheduler);
}

/**
* Declare an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification) {
return this.declareExchange(specification, null);
}

/**
* Declare an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
return this.declareExchange(specification, options);
}

/**
* Declare an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification) {
return this.declareExchange(specification, null);
}

/**
* Declare an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
Mono<? extends Channel> channelMono = getChannelMonoForResourceManagement(options);
AMQP.Exchange.Declare declare = new AMQImpl.Exchange.Declare.Builder()
Expand All @@ -482,26 +553,62 @@ public Mono<AMQP.Exchange.DeclareOk> declareExchange(ExchangeSpecification speci
.publishOn(resourceManagementScheduler);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification) {
return this.delete(specification, false);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, @Nullable ResourceManagementOptions options) {
return this.delete(specification, false, options);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused) {
return this.deleteExchange(specification, ifUnused);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> delete(ExchangeSpecification specification, boolean ifUnused, @Nullable ResourceManagementOptions options) {
return this.deleteExchange(specification, ifUnused, options);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused) {
return this.deleteExchange(specification, ifUnused, null);
}

/**
* Delete an exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*/
public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specification, boolean ifUnused, @Nullable ResourceManagementOptions options) {
Mono<? extends Channel> channelMono = getChannelMonoForResourceManagement(options);
AMQP.Exchange.Delete delete = new AMQImpl.Exchange.Delete.Builder()
Expand All @@ -524,6 +631,9 @@ public Mono<AMQP.Exchange.DeleteOk> deleteExchange(ExchangeSpecification specifi
* <p>
* Alias of {@link #unbind(BindingSpecification)}.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @return the result of the operation
* @since 1.4.1
Expand All @@ -537,6 +647,9 @@ public Mono<AMQP.Queue.UnbindOk> unbindQueue(BindingSpecification specification)
* <p>
* Alias of {@link #unbind(BindingSpecification, ResourceManagementOptions)}.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand All @@ -551,6 +664,9 @@ public Mono<AMQP.Queue.UnbindOk> unbindQueue(BindingSpecification specification,
* <p>
* Alias of {@link #unbindQueue(BindingSpecification)}.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @return the result of the operation
*/
Expand All @@ -563,6 +679,9 @@ public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification) {
* <p>
* Alias of {@link #unbindQueue(BindingSpecification, ResourceManagementOptions)}.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand Down Expand Up @@ -590,6 +709,9 @@ public Mono<AMQP.Queue.UnbindOk> unbind(BindingSpecification specification, @Nul
/**
* Unbind an exchange from another exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @return the result of the operation
* @since 1.4.1
Expand All @@ -601,6 +723,9 @@ public Mono<AMQP.Exchange.UnbindOk> unbindExchange(BindingSpecification specific
/**
* Unbind an exchange from another exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the unbinding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand Down Expand Up @@ -631,6 +756,9 @@ public Mono<AMQP.Exchange.UnbindOk> unbindExchange(BindingSpecification specific
* <p>
* Alias of {@link #bind(BindingSpecification)}
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @return the result of the operation
* @since 1.4.1
Expand All @@ -644,6 +772,9 @@ public Mono<AMQP.Queue.BindOk> bindQueue(BindingSpecification specification) {
* <p>
* Alias of {@link #bind(BindingSpecification, ResourceManagementOptions)}
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand All @@ -658,6 +789,9 @@ public Mono<AMQP.Queue.BindOk> bindQueue(BindingSpecification specification, @Nu
* <p>
* Alias of {@link #bindQueue(BindingSpecification)}
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @return the result of the operation
* @since 1.4.1
Expand All @@ -671,6 +805,9 @@ public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification) {
* <p>
* Alias of {@link #bindQueue(BindingSpecification, ResourceManagementOptions)}
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand Down Expand Up @@ -700,6 +837,9 @@ public Mono<AMQP.Queue.BindOk> bind(BindingSpecification specification, @Nullabl
/**
* Bind an exchange to another exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @return the result of the operation
* @since 1.4.1
Expand All @@ -711,6 +851,9 @@ public Mono<AMQP.Exchange.BindOk> bindExchange(BindingSpecification specificatio
/**
* Bind an exchange to another exchange.
*
* Warning: This method relies on RPC. As AMQP 0.9.1 protocol does not use a correlation ID for requests, a lock
* is being used to prevent concurrent RPCs, making this publisher potentially blocking.
*
* @param specification the binding specification
* @param options options to control the operation, e.g. channel to use
* @return the result of the operation
Expand Down