Skip to content

Commit

Permalink
Add single connection supplier helper
Browse files Browse the repository at this point in the history
And clarify connection supplier vs Mono<Connection> usage in
documentation.

References #93
  • Loading branch information
acogoluegnes committed Jun 27, 2019
1 parent 00a0ddd commit d77e1f7
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 28 deletions.
61 changes: 47 additions & 14 deletions src/docs/asciidoc/api-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -423,15 +423,45 @@ exception can be customized by providing a `Predicate<Throwable>` in place of

This section covers advanced uses of the Reactor RabbitMQ API.

==== Creating a connection with a custom `Mono`
==== Customizing connection creation

It is possible to specify only a `ConnectionFactory` for `Sender/ReceiverOptions` and
let Reactor RabbitMQ create connection from this `ConnectionFactory`.
If you want more control over the creation of connections, you can use
`Sender/ReceiverOptions#connectionSupplier(ConnectionFactory)`. This is fine for most cases
and doesn't use any reactive API. Both `Sender` and `Receiver` use internally a `Mono<Connection>`
to open the connection only when needed. It is possible to provide this `Mono<Connection>`
through the appropriate `*Options` class:
let Reactor RabbitMQ create connection from this `ConnectionFactory`. Internally, Reactor
RabbitMQ will create a `Mono<Connection>` to perform its operations and the connection
will be created only when needed.

When the developer
lets Reactor RabbitMQ create a `Mono<Connection>`, the library will take responsibility
for the following actions for each instance of `Sender` and `Receiver`:

* using a cache to avoid creating several connections (by using `Mono#cache()`)
* making the `Mono<Connection>` register on `connectionSubscriptionScheduler` (with `Mono#subscribeOn`)
* closing the connection when `Sender/Receiver#close()` is closed

Reactor RabbitMQ provides 2 ways to have more control over the connection creation, e.g. to provide
a name or to connect to different nodes:

* using a connection supplier (simplest option, no Reactive API involved)
* using a custom `Mono<Connection>` (implies Reactive API but provide more control)

===== Creating a connection with a supplier

The following snippet shows how to create connections with a custom name:

[source,java,indent=0]
--------
include::{test-examples}/AdvancedFeatures.java[tag=connection-supplier]
--------
<1> Create and configure connection factory
<2> Create supplier that creates connection with a name
<3> Create supplier that creates connection with a name

When using a connection supplier, Reactor RabbitMQ will create a `Mono<Connection>` and will take care
of the operations mentioned above (caching, registering on a scheduler, and closing).

===== Creating a connection with a custom `Mono`

The following snippet shows how to provide custom `Mono<Connection>`:

[source,java,indent=0]
--------
Expand All @@ -442,7 +472,9 @@ include::{test-examples}/AdvancedFeatures.java[tag=connection-mono]
<3> Create `Mono` that creates connection with a name

Providing your own `Mono<Connection>` lets you take advantage of all the Reactor API
(e.g. for caching).
(e.g. for caching) but has some caveats: Reactor RabbitMQ will not cache the provided `Mono<Connection>`,
will not use them on a scheduler, and will not close them automatically. This is developer's responsibility
to take care of these actions if they make sense in their context.


==== Sharing the same connection between `Sender` and `Receiver`
Expand All @@ -451,18 +483,19 @@ Providing your own `Mono<Connection>` lets you take advantage of all the Reactor
only one or a few `Connection` instances to be able to use exclusive resources between a `Sender`
and a `Receiver` or simply to control the number of created connections.

Both `SenderOptions` and `ReceiverOptions` have a `connectionMono` method that can encapsulate
any logic to create the `Mono<Connection>` the `Sender` or `Receiver` will end up using. Reactor
RabbitMQ provides a way to share the exact same connection instance from a `Mono<Connection>`:
Both `SenderOptions` and `ReceiverOptions` have a `connectionSupplier` method that can encapsulate
any logic to create the `Connection` the `Sender` or `Receiver` will end up using through
a `Mono<Connection>`. Reactor
RabbitMQ provides a way to share the exact same connection instance between some `Sender` and `Receiver` instances:

[source,java,indent=0]
--------
include::{test-examples}/AdvancedFeatures.java[tag=shared-connection]
--------
<1> Create and configure connection factory
<2> Create `Mono` that re-uses the same connection instance
<3> Create sender with connection `Mono`
<4> Create receiver with connection `Mono`
<2> Create supplier that re-uses the same connection instance
<3> Create sender with connection supplier
<4> Create receiver with connection supplier

Be aware that closing the first `Sender` or `Receiver` will close the underlying
AMQP connection for all the others.
Expand Down
54 changes: 54 additions & 0 deletions src/main/java/reactor/rabbitmq/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import com.rabbitmq.client.ConnectionFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
* Set of utilities.
Expand All @@ -39,9 +44,58 @@ public static Mono<? extends Connection> singleConnectionMono(Callable<? extends
return Mono.fromCallable(() -> new IdempotentClosedConnection(supplier.call())).cache();
}

public static Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(
ConnectionFactory cf, Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> supplier) {
return new SingleConnectionSupplier(() -> supplier.apply(cf));
}

public static Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(ConnectionFactory cf) {
return new SingleConnectionSupplier(() -> cf.newConnection());
}

public static Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> singleConnectionSupplier(Callable<? extends Connection> supplier) {
return new SingleConnectionSupplier(supplier);
}

@FunctionalInterface
public interface ExceptionFunction<T, R> {

R apply(T t) throws Exception;
}

public static class SingleConnectionSupplier implements Utils.ExceptionFunction<ConnectionFactory, Connection> {

private final Callable<? extends Connection> creationAction;
private final Duration waitTimeout;

private final CountDownLatch latch = new CountDownLatch(1);
private AtomicBoolean created = new AtomicBoolean(false);
private AtomicReference<Connection> connection = new AtomicReference<>();

public SingleConnectionSupplier(Callable<? extends Connection> creationAction) {
this(creationAction, Duration.ofMinutes(5));
}

public SingleConnectionSupplier(Callable<? extends Connection> creationAction, Duration waitTimeout) {
this.creationAction = creationAction;
this.waitTimeout = waitTimeout;
}

@Override
public Connection apply(ConnectionFactory connectionFactory) throws Exception {
if (created.compareAndSet(false, true)) {
connection.set(new IdempotentClosedConnection(creationAction.call()));
latch.countDown();
} else {
boolean reachedZero = latch.await(waitTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (!reachedZero) {
if (connection.get() != null) { // if lucky
return connection.get();
}
throw new RabbitFluxException("Reached timeout when waiting for connection to be created: " + waitTimeout);
}
}
return connection.get();
}
}
}
49 changes: 44 additions & 5 deletions src/test/java/reactor/rabbitmq/RabbitFluxTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -1158,7 +1156,11 @@ public void partitions() throws Exception {
public void connectionMonoSharedConnection() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
Mono<? extends Connection> connectionMono = Utils.singleConnectionMono(connectionFactory, cf -> cf.newConnection());
AtomicInteger callCount = new AtomicInteger(0);
Mono<? extends Connection> connectionMono = Utils.singleConnectionMono(connectionFactory, cf -> {
callCount.incrementAndGet();
return cf.newConnection();
});

sender = createSender(new SenderOptions().connectionMono(connectionMono));
receiver = createReceiver(new ReceiverOptions().connectionMono(connectionMono));
Expand All @@ -1167,6 +1169,39 @@ public void connectionMonoSharedConnection() throws Exception {
.block().getQueue();

sendAndReceiveMessages(connectionQueue);
assertThat(callCount).hasValue(1);
connectionMono.block().close();
}

@Test
public void connectionSupplierConnectionIsSharedAndClosed() throws Exception {
AtomicInteger callCount = new AtomicInteger(0);
ConnectionFactory connectionFactory = new ConnectionFactory() {
@Override
public Connection newConnection() throws IOException, TimeoutException {
callCount.incrementAndGet();
return super.newConnection();
}
};
connectionFactory.useNio();

Collection<Utils.ExceptionFunction<ConnectionFactory, ? extends Connection>> connectionSuppliers = new ArrayList<>();
connectionSuppliers.add(Utils.singleConnectionSupplier(connectionFactory, cf -> cf.newConnection()));
connectionSuppliers.add(Utils.singleConnectionSupplier(connectionFactory));
connectionSuppliers.add(Utils.singleConnectionSupplier(() -> connectionFactory.newConnection()));

for (Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> connectionSupplier : connectionSuppliers) {
try (Sender s = createSender(new SenderOptions().connectionSupplier(connectionSupplier));
Receiver r = createReceiver(new ReceiverOptions().connectionSupplier(connectionSupplier))) {
String connectionQueue = s.declare(QueueSpecification.queue().durable(false).autoDelete(true).exclusive(true))
.block().getQueue();

sendAndReceiveMessages(connectionQueue, s, r);
}
assertThat(callCount).hasValue(1);
assertThat(connectionSupplier.apply(null).isOpen()).isFalse();
callCount.set(0);
}
}

@Test
Expand Down Expand Up @@ -1228,6 +1263,10 @@ public void creatingNonExistentPassiveExchangeResultsInError() {
}

private void sendAndReceiveMessages(String queue) throws Exception {
sendAndReceiveMessages(queue, this.sender, this.receiver);
}

private void sendAndReceiveMessages(String queue, Sender sender, Receiver receiver) throws Exception {
int nbMessages = 10;
CountDownLatch latch = new CountDownLatch(nbMessages);
Disposable subscriber = receiver.consumeAutoAck(queue).subscribe(delivery -> latch.countDown());
Expand Down
36 changes: 27 additions & 9 deletions src/test/java/reactor/rabbitmq/docs/AdvancedFeatures.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2018-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,35 +32,53 @@
@SuppressWarnings("unused")
public class AdvancedFeatures {

void connectionSupplier() {
// tag::connection-supplier[]
ConnectionFactory connectionFactory = new ConnectionFactory(); // <1>
connectionFactory.useNio();

Sender sender = RabbitFlux.createSender(new SenderOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection("sender")) // <2>
);

Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
.connectionFactory(connectionFactory)
.connectionSupplier(cf -> cf.newConnection("receiver")) // <3>
);
// end::connection-supplier[]
}

void connectionMono() {
// tag::connection-mono[]
ConnectionFactory connectionFactory = new ConnectionFactory(); // <1>
ConnectionFactory connectionFactory = new ConnectionFactory(); // <1>
connectionFactory.useNio();

Sender sender = RabbitFlux.createSender(new SenderOptions()
.connectionMono(
Mono.fromCallable(() -> connectionFactory.newConnection("sender"))) // <2>
Mono.fromCallable(() -> connectionFactory.newConnection("sender")).cache()) // <2>
);
Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions()
.connectionMono(
Mono.fromCallable(() -> connectionFactory.newConnection("receiver"))) // <3>
Mono.fromCallable(() -> connectionFactory.newConnection("receiver")).cache()) // <3>
);
// end::connection-mono[]
}

void sharedConnection() {
// tag::shared-connection[]
ConnectionFactory connectionFactory = new ConnectionFactory(); // <1>
ConnectionFactory connectionFactory = new ConnectionFactory(); // <1>
connectionFactory.useNio();
Mono<? extends Connection> connectionMono = Utils.singleConnectionMono( // <2>
connectionFactory, cf -> cf.newConnection()
Utils.ExceptionFunction<ConnectionFactory, ? extends Connection> connectionsupplier =
Utils.singleConnectionSupplier( // <2>
connectionFactory, cf -> cf.newConnection()
);

Sender sender = RabbitFlux.createSender(
new SenderOptions().connectionMono(connectionMono) // <3>
new SenderOptions().connectionSupplier(connectionsupplier) // <3>
);
Receiver receiver = RabbitFlux.createReceiver(
new ReceiverOptions().connectionMono(connectionMono) // <4>
new ReceiverOptions().connectionSupplier(connectionsupplier) // <4>
);
// end::shared-connection[]
}
Expand Down

0 comments on commit d77e1f7

Please sign in to comment.