*
* @param hashSlotCacheTTL the TTL of the hash slot cache, in millis
+ * @return fluent self
*/
public RedisOptions setHashSlotCacheTTL(long hashSlotCacheTTL) {
this.hashSlotCacheTTL = hashSlotCacheTTL;
return this;
}
+ /**
+ * Returns whether automatic failover is enabled. This only makes sense for sentinel clients
+ * with role of {@link RedisRole#MASTER} and is ignored otherwise.
+ *
+ * If enabled, the sentinel client will additionally create a connection to one sentinel node
+ * and watch for failover events. When new master is elected, all connections to the old master
+ * are automatically closed and new connections to the new master are created. Note that
+ * these new connections will not have the same event handlers
+ * ({@link RedisConnection#handler(Handler) handler()},
+ * {@link RedisConnection#exceptionHandler(Handler) exceptionHandler()} and
+ * {@link RedisConnection#endHandler(Handler) endHandler()}), will not be
+ * in the same streaming mode ({@link RedisConnection#pause() pause()},
+ * {@link RedisConnection#resume() resume()} and {@link RedisConnection#fetch(long) fetch()}),
+ * and will not watch the same subscriptions ({@code SUBSCRIBE}, {@code PSUBSCRIBE}, etc.)
+ * as the old ones. In other words, automatic failover makes sense for connections executing
+ * regular commands, but not for connections used to subscribe to Redis pub/sub channels.
+ *
+ * Note that there is a brief period of time between the old master failing and the new
+ * master being elected when the existing connections will temporarily fail all operations.
+ * After the new master is elected, the connections will automatically fail over and
+ * start working again.
+ *
+ * @return whether automatic failover is enabled
+ */
+ public boolean isAutoFailover() {
+ return autoFailover;
+ }
+
+ /**
+ * Returns whether automatic failover is enabled. This only makes sense for sentinel clients
+ * with role of {@link RedisRole#MASTER} and is ignored otherwise.
+ *
+ * If enabled, the sentinel client will additionally create a connection to one sentinel node
+ * and watch for failover events. When new master is elected, all connections to the old master
+ * are automatically closed and new connections to the new master are created. Note that
+ * these new connections will not have the same event handlers
+ * ({@link RedisConnection#handler(Handler) handler()},
+ * {@link RedisConnection#exceptionHandler(Handler) exceptionHandler()} and
+ * {@link RedisConnection#endHandler(Handler) endHandler()}), will not be
+ * in the same streaming mode ({@link RedisConnection#pause() pause()},
+ * {@link RedisConnection#resume() resume()} and {@link RedisConnection#fetch(long) fetch()}),
+ * and will not watch the same subscriptions ({@code SUBSCRIBE}, {@code PSUBSCRIBE}, etc.)
+ * as the old ones. In other words, automatic failover makes sense for connections executing
+ * regular commands, but not for connections used to subscribe to Redis pub/sub channels.
+ *
+ * Note that there is a brief period of time between the old master failing and the new
+ * master being elected when the existing connections will temporarily fail all operations.
+ * After the new master is elected, the connections will automatically fail over and
+ * start working again.
+ *
+ * @param autoFailover whether automatic failover should be enabled
+ * @return fluent self
+ */
+ public RedisOptions setAutoFailover(boolean autoFailover) {
+ this.autoFailover = autoFailover;
+ return this;
+ }
+
/**
* Converts this object to JSON notation.
*
diff --git a/src/main/java/io/vertx/redis/client/RedisSentinelConnectOptions.java b/src/main/java/io/vertx/redis/client/RedisSentinelConnectOptions.java
index 893dbaa7..bd47bb18 100644
--- a/src/main/java/io/vertx/redis/client/RedisSentinelConnectOptions.java
+++ b/src/main/java/io/vertx/redis/client/RedisSentinelConnectOptions.java
@@ -17,7 +17,9 @@
import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.json.annotations.JsonGen;
+import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
+import io.vertx.redis.client.impl.RedisSentinelConnection;
import java.util.List;
@@ -27,6 +29,7 @@ public class RedisSentinelConnectOptions extends RedisConnectOptions {
private RedisRole role;
private String masterName;
+ private boolean autoFailover;
public RedisSentinelConnectOptions() {
super();
@@ -38,12 +41,14 @@ public RedisSentinelConnectOptions(RedisOptions options) {
super(options);
setRole(options.getRole());
setMasterName(options.getMasterName());
+ setAutoFailover(options.isAutoFailover());
}
public RedisSentinelConnectOptions(RedisSentinelConnectOptions other) {
super(other);
setRole(other.getRole());
setMasterName(other.getMasterName());
+ setAutoFailover(other.isAutoFailover());
}
public RedisSentinelConnectOptions(JsonObject json) {
@@ -66,7 +71,7 @@ public RedisRole getRole() {
* @param role the role
* @return fluent self
*/
- public RedisConnectOptions setRole(RedisRole role) {
+ public RedisSentinelConnectOptions setRole(RedisRole role) {
this.role = role;
return this;
}
@@ -86,11 +91,69 @@ public String getMasterName() {
* @param masterName the master set name
* @return fluent self
*/
- public RedisConnectOptions setMasterName(String masterName) {
+ public RedisSentinelConnectOptions setMasterName(String masterName) {
this.masterName = masterName;
return this;
}
+ /**
+ * Returns whether automatic failover is enabled. This only makes sense for sentinel clients
+ * with role of {@link RedisRole#MASTER} and is ignored otherwise.
+ *
+ * If enabled, the sentinel client will additionally create a connection to one sentinel node
+ * and watch for failover events. When new master is elected, all connections to the old master
+ * are automatically closed and new connections to the new master are created. Note that
+ * these new connections will not have the same event handlers
+ * ({@link RedisConnection#handler(Handler) handler()},
+ * {@link RedisConnection#exceptionHandler(Handler) exceptionHandler()} and
+ * {@link RedisConnection#endHandler(Handler) endHandler()}), will not be
+ * in the same streaming mode ({@link RedisConnection#pause() pause()},
+ * {@link RedisConnection#resume() resume()} and {@link RedisConnection#fetch(long) fetch()}),
+ * and will not watch the same subscriptions ({@code SUBSCRIBE}, {@code PSUBSCRIBE}, etc.)
+ * as the old ones. In other words, automatic failover makes sense for connections executing
+ * regular commands, but not for connections used to subscribe to Redis pub/sub channels.
+ *
+ * Note that there is a brief period of time between the old master failing and the new
+ * master being elected when the existing connections will temporarily fail all operations.
+ * After the new master is elected, the connections will automatically fail over and
+ * start working again.
+ *
+ * @return whether automatic failover is enabled
+ */
+ public boolean isAutoFailover() {
+ return autoFailover;
+ }
+
+ /**
+ * Returns whether automatic failover is enabled. This only makes sense for sentinel clients
+ * with role of {@link RedisRole#MASTER} and is ignored otherwise.
+ *
+ * If enabled, the sentinel client will additionally create a connection to one sentinel node
+ * and watch for failover events. When new master is elected, all connections to the old master
+ * are automatically closed and new connections to the new master are created. Note that
+ * these new connections will not have the same event handlers
+ * ({@link RedisConnection#handler(Handler) handler()},
+ * {@link RedisConnection#exceptionHandler(Handler) exceptionHandler()} and
+ * {@link RedisConnection#endHandler(Handler) endHandler()}), will not be
+ * in the same streaming mode ({@link RedisConnection#pause() pause()},
+ * {@link RedisConnection#resume() resume()} and {@link RedisConnection#fetch(long) fetch()}),
+ * and will not watch the same subscriptions ({@code SUBSCRIBE}, {@code PSUBSCRIBE}, etc.)
+ * as the old ones. In other words, automatic failover makes sense for connections executing
+ * regular commands, but not for connections used to subscribe to Redis pub/sub channels.
+ *
+ * Note that there is a brief period of time between the old master failing and the new
+ * master being elected when the existing connections will temporarily fail all operations.
+ * After the new master is elected, the connections will automatically fail over and
+ * start working again.
+ *
+ * @param autoFailover whether automatic failover should be enabled
+ * @return fluent self
+ */
+ public RedisSentinelConnectOptions setAutoFailover(boolean autoFailover) {
+ this.autoFailover = autoFailover;
+ return this;
+ }
+
@Override
public RedisSentinelConnectOptions setMaxNestedArrays(int maxNestedArrays) {
return (RedisSentinelConnectOptions) super.setMaxNestedArrays(maxNestedArrays);
diff --git a/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java b/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java
index fb04d95e..4152e7e9 100644
--- a/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java
+++ b/src/main/java/io/vertx/redis/client/impl/PooledRedisConnection.java
@@ -88,9 +88,10 @@ public RedisConnection endHandler(@Nullable Handler endHandler) {
public Future close() {
if (connection.reset()) {
lease.recycle();
- }
- if (metrics != null) {
- metrics.end(metric, true);
+
+ if (metrics != null) {
+ metrics.end(metric, true);
+ }
}
return Future.succeededFuture();
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
index 8b807277..8aab6c2f 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java
@@ -15,13 +15,23 @@
*/
package io.vertx.redis.client.impl;
-import io.vertx.core.*;
-import io.vertx.core.internal.ContextInternal;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.tracing.TracingPolicy;
-import io.vertx.redis.client.*;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.PoolOptions;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisClusterConnectOptions;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.NumberType;
import io.vertx.redis.client.impl.types.SimpleStringType;
@@ -32,12 +42,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
-import static io.vertx.redis.client.Command.*;
-import static io.vertx.redis.client.Request.cmd;
-
public class RedisClusterClient extends BaseRedisClient implements Redis {
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterClient.class);
@@ -53,11 +59,11 @@ public static void addMasterOnlyCommand(Command command) {
static {
// provided reducers
- addReducer(MSET, list ->
+ addReducer(Command.MSET, list ->
// Simple string reply: always OK since MSET can't fail.
SimpleStringType.OK);
- addReducer(DEL, list ->
+ addReducer(Command.DEL, list ->
NumberType.create(list.stream()
.mapToLong(el -> {
Long l = el.toLong();
@@ -68,7 +74,7 @@ public static void addMasterOnlyCommand(Command command) {
}
}).sum()));
- addReducer(MGET, list -> {
+ addReducer(Command.MGET, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
@@ -84,7 +90,7 @@ public static void addMasterOnlyCommand(Command command) {
return multi;
});
- addReducer(KEYS, list -> {
+ addReducer(Command.KEYS, list -> {
int total = 0;
for (Response resp : list) {
total += resp.size();
@@ -100,11 +106,11 @@ public static void addMasterOnlyCommand(Command command) {
return multi;
});
- addReducer(FLUSHDB, list ->
+ addReducer(Command.FLUSHDB, list ->
// Simple string reply: always OK since FLUSHDB can't fail.
SimpleStringType.OK);
- addReducer(DBSIZE, list ->
+ addReducer(Command.DBSIZE, list ->
// Sum of key numbers on all Key Slots
NumberType.create(list.stream()
.mapToLong(el -> {
@@ -116,25 +122,23 @@ public static void addMasterOnlyCommand(Command command) {
}
}).sum()));
- addMasterOnlyCommand(WAIT);
+ addMasterOnlyCommand(Command.WAIT);
- addMasterOnlyCommand(SUBSCRIBE);
- addMasterOnlyCommand(PSUBSCRIBE);
- addMasterOnlyCommand(SSUBSCRIBE);
- addReducer(UNSUBSCRIBE, list -> SimpleStringType.OK);
- addReducer(PUNSUBSCRIBE, list -> SimpleStringType.OK);
- addReducer(SUNSUBSCRIBE, list -> SimpleStringType.OK);
+ addMasterOnlyCommand(Command.SUBSCRIBE);
+ addMasterOnlyCommand(Command.PSUBSCRIBE);
+ addMasterOnlyCommand(Command.SSUBSCRIBE);
+ addReducer(Command.UNSUBSCRIBE, list -> SimpleStringType.OK);
+ addReducer(Command.PUNSUBSCRIBE, list -> SimpleStringType.OK);
+ addReducer(Command.SUNSUBSCRIBE, list -> SimpleStringType.OK);
}
- private final RedisClusterConnectOptions connectOptions;
- private final PoolOptions poolOptions;
-
- private final AtomicReference> slots = new AtomicReference<>();
+ final RedisClusterConnectOptions connectOptions;
+ final SharedSlots sharedSlots;
public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions poolOptions, RedisClusterConnectOptions connectOptions, TracingPolicy tracingPolicy) {
super(vertx, tcpOptions, poolOptions, connectOptions, tracingPolicy);
this.connectOptions = connectOptions;
- this.poolOptions = poolOptions;
+ this.sharedSlots = new SharedSlots(vertx, connectOptions, connectionManager);
// validate options
if (poolOptions.getMaxWaiting() < poolOptions.getMaxSize()) {
throw new IllegalStateException("Invalid options: maxWaiting < maxSize");
@@ -144,7 +148,7 @@ public RedisClusterClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions
@Override
public Future connect() {
final Promise promise = vertx.promise();
- getSlots(vertx.getOrCreateContext())
+ sharedSlots.get()
.onSuccess(slots -> connect(slots, promise))
.onFailure(promise::fail);
return promise.future();
@@ -157,7 +161,7 @@ private void connect(Slots slots, Handler> onConnec
final Map connections = new HashMap<>();
for (String endpoint : slots.endpoints()) {
- connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
+ connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
.onFailure(err -> {
// failed try with the next endpoint
failures.add(err);
@@ -198,87 +202,10 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map this.slots.set(null), connections)));
- }
- }
- }
-
- private Future getSlots(ContextInternal context) {
- while (true) {
- Future slots = this.slots.get();
- if (slots != null) {
- return slots;
- }
-
- Promise promise = context.promise();
- Future future = promise.future();
- if (this.slots.compareAndSet(null, future)) {
- LOG.debug("Obtaining hash slot assignment");
- // attempt to load the slots from the first good endpoint
- getSlots(connectOptions.getEndpoints(), 0, ConcurrentHashMap.newKeySet(), promise);
- return future;
+ onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectionManager,
+ connectOptions, sharedSlots, connections)));
}
}
}
- private void getSlots(List endpoints, int index, Set failures, Handler> onGotSlots) {
- if (index >= endpoints.size()) {
- // stop condition
- StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
- for (Throwable failure : failures) {
- message.append("\n- ").append(failure);
- }
- onGotSlots.handle(Future.failedFuture(new RedisConnectException(message.toString())));
- scheduleCachedSlotsExpiration();
- return;
- }
-
- connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? cmd(READONLY) : null)
- .onFailure(err -> {
- // try with the next endpoint
- failures.add(err);
- getSlots(endpoints, index + 1, failures, onGotSlots);
- })
- .onSuccess(conn -> {
- getSlots(endpoints.get(index), conn).onComplete(result -> {
- // the connection is not needed anymore, regardless of success or failure
- // (on success, we just finish, on failure, we'll try another endpoint)
- conn.close().onFailure(LOG::warn);
-
- if (result.failed()) {
- // the slots command failed, try with next endpoint
- failures.add(result.cause());
- getSlots(endpoints, index + 1, failures, onGotSlots);
- } else {
- Slots slots = result.result();
- onGotSlots.handle(Future.succeededFuture(slots));
- scheduleCachedSlotsExpiration();
- }
- });
- });
- }
-
- private Future getSlots(String endpoint, RedisConnection conn) {
- return conn
- .send(cmd(CLUSTER).arg("SLOTS"))
- .compose(reply -> {
- if (reply == null || reply.size() == 0) {
- // no slots available we can't really proceed
- return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster.");
- }
-
- Slots result;
- try {
- result = new Slots(endpoint, reply);
- } catch (Exception e) {
- return Future.failedFuture("CLUSTER SLOTS response invalid: " + e);
- }
- return Future.succeededFuture(result);
- });
- }
-
- private void scheduleCachedSlotsExpiration() {
- vertx.setTimer(connectOptions.getHashSlotCacheTTL(), ignored -> this.slots.set(null));
- }
}
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
index 1594b390..8dff1caa 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterConnection.java
@@ -1,20 +1,32 @@
package io.vertx.redis.client.impl;
import io.vertx.codegen.annotations.Nullable;
-import io.vertx.core.*;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
-import io.vertx.redis.client.*;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.RedisClusterConnectOptions;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.types.ErrorType;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.function.Function;
-import static io.vertx.redis.client.Command.ASKING;
-import static io.vertx.redis.client.Command.AUTH;
-import static io.vertx.redis.client.Request.cmd;
-
public class RedisClusterConnection implements RedisConnection {
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterConnection.class);
@@ -40,17 +52,17 @@ public static void addMasterOnlyCommand(Command command) {
}
private final VertxInternal vertx;
+ private final RedisConnectionManager connectionManager;
private final RedisClusterConnectOptions connectOptions;
- private final Slots slots;
- private final Runnable onMoved;
+ private final SharedSlots sharedSlots;
private final Map connections;
- RedisClusterConnection(Vertx vertx, RedisClusterConnectOptions connectOptions, Slots slots, Runnable onMoved,
- Map connections) {
+ RedisClusterConnection(Vertx vertx, RedisConnectionManager connectionManager, RedisClusterConnectOptions connectOptions,
+ SharedSlots sharedSlots, Map connections) {
this.vertx = (VertxInternal) vertx;
+ this.connectionManager = connectionManager;
this.connectOptions = connectOptions;
- this.slots = slots;
- this.onMoved = onMoved;
+ this.sharedSlots = sharedSlots;
this.connections = connections;
}
@@ -116,6 +128,11 @@ public RedisConnection endHandler(@Nullable Handler handler) {
@Override
public Future send(Request request) {
+ return sharedSlots.get()
+ .compose(slots -> send(request, slots));
+ }
+
+ private Future send(Request request, Slots slots) {
final Promise promise = vertx.promise();
// process commands for cluster mode
@@ -126,7 +143,7 @@ public Future send(Request request) {
if (cmd.needsGetKeys()) {
// it is required to resolve the keys at the server side as we cannot deduct where they are algorithmically
// we shall run this commands on the master node always
- send(selectEndpoint(-1, cmd.isReadOnly(args), true), RETRIES, req, promise);
+ send(selectEndpoint(slots, -1, cmd.isReadOnly(args), true), RETRIES, req, promise);
return promise.future();
}
@@ -157,12 +174,12 @@ public Future send(Request request) {
});
} else {
// it doesn't matter which node to use
- send(selectEndpoint(-1, cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, req, promise);
+ send(selectEndpoint(slots, -1, cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, req, promise);
}
return promise.future();
case 1:
// trivial option the command is single key
- send(selectEndpoint(ZModem.generate(keys.get(0)), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, req, promise);
+ send(selectEndpoint(slots, ZModem.generate(keys.get(0)), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, req, promise);
return promise.future();
default:
// hashSlot -1 indicates that not all keys of the command targets the same hash slot,
@@ -189,7 +206,7 @@ public Future send(Request request) {
for (Map.Entry kv : requests.entrySet()) {
final Promise p = vertx.promise();
- send(selectEndpoint(kv.getKey(), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, kv.getValue(), p);
+ send(selectEndpoint(slots, kv.getKey(), cmd.isReadOnly(args), forceMasterEndpoint), RETRIES, kv.getValue(), p);
responses.add(p.future());
}
@@ -250,11 +267,27 @@ private Map splitRequest(CommandImpl cmd, List args) {
}
private void send(String endpoint, int retries, Request command, Handler> handler) {
-
- final PooledRedisConnection connection = connections.get(endpoint);
-
+ PooledRedisConnection connection = connections.get(endpoint);
if (connection == null) {
- handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
+ connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
+ .onSuccess(conn -> {
+ synchronized (connections) {
+ if (connections.containsKey(endpoint)) {
+ conn.close()
+ .onFailure(t -> LOG.warn("Failed closing connection: " + t));
+ } else {
+ connections.put(endpoint, conn);
+ }
+ }
+ send(endpoint, retries, command, handler);
+ })
+ .onFailure(t -> {
+ if (retries > 0) {
+ send(endpoint, retries - 1, command, handler);
+ } else {
+ handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
+ }
+ });
return;
}
@@ -264,32 +297,38 @@ private void send(String endpoint, int retries, Request command, Handler= 0) {
final ErrorType cause = (ErrorType) send.cause();
- if (cause.is("MOVED")) {
- this.onMoved.run();
- // cluster is unbalanced, need to reconnect
- handler.handle(Future.failedFuture(cause));
- return;
- }
+ boolean ask = cause.is("ASK");
+ boolean moved = cause.is("MOVED");
+ if (ask || moved) {
+ if (moved) {
+ sharedSlots.invalidate();
+ }
- if (cause.is("ASK")) {
- connection
- .send(cmd(ASKING))
- .onFailure(err -> handler.handle(Future.failedFuture(err)))
- .onSuccess(asking -> {
- // attempt to recover
- // REQUERY THE NEW ONE (we've got the correct details)
- String addr = cause.slice(' ', cause.is("ERR") ? 3 : 2);
-
- if (addr == null) {
- // bad message
- handler.handle(Future.failedFuture(cause));
- return;
+ // attempt to recover
+ String addr = cause.slice(' ', 2);
+ if (addr == null) {
+ // bad message
+ handler.handle(Future.failedFuture("Cannot find endpoint:port in redirection: " + cause));
+ return;
+ }
+
+ RedisURI uri = new RedisURI(endpoint);
+ if (addr.startsWith(":")) {
+ // unknown endpoint, need to use the current one but the provided port
+ addr = uri.socketAddress().host() + addr;
+ }
+ String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
+ if (ask) {
+ send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), resp -> {
+ if (resp.failed()) {
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
+ } else {
+ send(newEndpoint, retries - 1, command, handler);
}
- // inherit protocol config from the current connection
- final RedisURI uri = new RedisURI(endpoint);
- // re-run on the new endpoint
- send(uri.protocol() + "://" + uri.userinfo() + addr, retries - 1, command, handler);
});
+ } else {
+ send(newEndpoint, retries - 1, command, handler);
+ }
return;
}
@@ -303,7 +342,7 @@ private void send(String endpoint, int retries, Request command, Handler handler.handle(Future.failedFuture(err)))
.onSuccess(auth -> {
// again
@@ -323,6 +362,11 @@ private void send(String endpoint, int retries, Request command, Handler> batch(List requests) {
+ return sharedSlots.get()
+ .compose(slots -> batch(requests, slots));
+ }
+
+ private Future> batch(List requests, Slots slots) {
final Promise> promise = vertx.promise();
if (requests.isEmpty()) {
@@ -397,18 +441,34 @@ public Future> batch(List requests) {
// all keys are on the same slot!
//we just need to decide which endpoint to use based on additional options
- batch(selectEndpoint(correctSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
+ batch(selectEndpoint(slots, correctSlot, readOnly, forceMasterEndpoint), RETRIES, requests, promise);
}
return promise.future();
}
private void batch(String endpoint, int retries, List commands, Handler>> handler) {
-
- final RedisConnection connection = connections.get(endpoint);
-
+ RedisConnection connection = connections.get(endpoint);
if (connection == null) {
- handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
+ connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
+ .onSuccess(conn -> {
+ synchronized (connections) {
+ if (connections.containsKey(endpoint)) {
+ conn.close()
+ .onFailure(t -> LOG.warn("Failed closing connection: " + t));
+ } else {
+ connections.put(endpoint, conn);
+ }
+ }
+ batch(endpoint, retries, commands, handler);
+ })
+ .onFailure(t -> {
+ if (retries > 0) {
+ batch(endpoint, retries - 1, commands, handler);
+ } else {
+ handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
+ }
+ });
return;
}
@@ -418,33 +478,38 @@ private void batch(String endpoint, int retries, List commands, Handler
if (send.failed() && send.cause() instanceof ErrorType && retries >= 0) {
final ErrorType cause = (ErrorType) send.cause();
- if (cause.is("MOVED")) {
- this.onMoved.run();
- // cluster is unbalanced, need to reconnect
- handler.handle(Future.failedFuture(cause));
- return;
- }
+ boolean ask = cause.is("ASK");
+ boolean moved = cause.is("MOVED");
+ if (ask || moved) {
+ if (moved) {
+ sharedSlots.invalidate();
+ }
- if (cause.is("ASK")) {
- connection
- .send(cmd(ASKING))
- .onFailure(err -> handler.handle(Future.failedFuture(err)))
- .onSuccess(asking -> {
- // attempt to recover
- // REQUERY THE NEW ONE (we've got the correct details)
- String addr = cause.slice(' ', cause.is("ERR") ? 3 : 2);
-
- if (addr == null) {
- // bad message
- handler.handle(Future.failedFuture(cause));
- return;
- }
+ // attempt to recover
+ String addr = cause.slice(' ', 2);
+ if (addr == null) {
+ // bad message
+ handler.handle(Future.failedFuture("Cannot find endpoint:port in redirection: " + cause));
+ return;
+ }
- // inherit protocol config from the current connection
- final RedisURI uri = new RedisURI(endpoint);
- // re-run on the new endpoint
- batch(uri.protocol() + "://" + uri.userinfo() + addr, retries - 1, commands, handler);
+ RedisURI uri = new RedisURI(endpoint);
+ if (addr.startsWith(":")) {
+ // unknown endpoint, need to use the current one but the provided port
+ addr = uri.socketAddress().host() + addr;
+ }
+ String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
+ if (ask) {
+ batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> {
+ if (resp.failed()) {
+ handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
+ } else {
+ batch(newEndpoint, retries - 1, commands, handler);
+ }
});
+ } else {
+ batch(newEndpoint, retries - 1, commands, handler);
+ }
return;
}
@@ -458,7 +523,7 @@ private void batch(String endpoint, int retries, List commands, Handler
if (cause.is("NOAUTH") && connectOptions.getPassword() != null) {
// try to authenticate
connection
- .send(cmd(AUTH).arg(connectOptions.getPassword()))
+ .send(Request.cmd(Command.AUTH).arg(connectOptions.getPassword()))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(auth -> {
// again
@@ -504,7 +569,7 @@ public boolean pendingQueueFull() {
/**
* Select a Redis client for the given key
*/
- private String selectEndpoint(int keySlot, boolean readOnly, boolean forceMasterEndpoint) {
+ private String selectEndpoint(Slots slots, int keySlot, boolean readOnly, boolean forceMasterEndpoint) {
// this command doesn't have keys, return any connection
// NOTE: this means replicas may be used for no key commands regardless of the config
if (keySlot == -1) {
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
index 7cec44ed..61d67ad2 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelClient.java
@@ -15,26 +15,34 @@
*/
package io.vertx.redis.client.impl;
-import io.vertx.core.*;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.tracing.TracingPolicy;
-import io.vertx.redis.client.*;
-import io.vertx.redis.client.impl.types.ErrorType;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.PoolOptions;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisConnectOptions;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisRole;
+import io.vertx.redis.client.RedisSentinelConnectOptions;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.Response;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
-import static io.vertx.redis.client.Command.*;
-import static io.vertx.redis.client.Request.cmd;
+import java.util.concurrent.atomic.AtomicReference;
public class RedisSentinelClient extends BaseRedisClient implements Redis {
- // We don't need to be secure, we just want so simple
- // randomization to avoid picking the same replica all the time
+ // we need some randomness, it doesn't need to be cryptographically secure
private static final Random RANDOM = new Random();
private static class Pair {
@@ -50,6 +58,7 @@ private static class Pair {
private static final Logger LOG = LoggerFactory.getLogger(RedisSentinelClient.class);
private final RedisSentinelConnectOptions connectOptions;
+ private final AtomicReference failover = new AtomicReference<>();
public RedisSentinelClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions poolOptions, RedisSentinelConnectOptions connectOptions, TracingPolicy tracingPolicy) {
super(vertx, tcpOptions, poolOptions, connectOptions, tracingPolicy);
@@ -64,7 +73,6 @@ public RedisSentinelClient(Vertx vertx, NetClientOptions tcpOptions, PoolOptions
public Future connect() {
final Promise promise = vertx.promise();
- // sentinel (HA) requires 2 connections, one to watch for sentinel events and the connection itself
createConnectionInternal(connectOptions, connectOptions.getRole(), createConnection -> {
if (createConnection.failed()) {
promise.fail(createConnection.cause());
@@ -73,47 +81,55 @@ public Future connect() {
final PooledRedisConnection conn = createConnection.result();
- createConnectionInternal(connectOptions, RedisRole.SENTINEL, create -> {
- if (create.failed()) {
- LOG.error("Redis PUB/SUB wrap failed.", create.cause());
- promise.fail(create.cause());
- return;
- }
-
- PooledRedisConnection sentinel = create.result();
-
- sentinel
- .handler(msg -> {
- if (msg.type() == ResponseType.MULTI) {
- if ("MESSAGE".equalsIgnoreCase(msg.get(0).toString())) {
- // we don't care about the payload
- if (conn != null) {
- ((RedisStandaloneConnection) conn.actual()).fail(ErrorType.create("SWITCH-MASTER Received +switch-master message from Redis Sentinel."));
- } else {
- LOG.warn("Received +switch-master message from Redis Sentinel.");
- }
- }
- }
- });
-
- sentinel
- .send(cmd(SUBSCRIBE).arg("+switch-master"))
- .onFailure(promise::fail)
- .onSuccess(ok -> promise.complete(new RedisSentinelConnection(conn, sentinel)));
-
- sentinel.exceptionHandler(t -> {
- if (conn != null) {
- ((RedisStandaloneConnection) conn.actual()).fail(t);
- } else {
- LOG.error("Unhandled exception in Sentinel PUBSUB", t);
- }
- });
- });
+ if (connectOptions.getRole() == RedisRole.SENTINEL || connectOptions.getRole() == RedisRole.REPLICA) {
+ // it is possible that a replica is later promoted to a master, but that shouldn't be too big of a deal
+ promise.complete(conn);
+ return;
+ }
+ if (!connectOptions.isAutoFailover()) {
+ // no auto failover, return the master connection directly
+ promise.complete(conn);
+ return;
+ }
+
+ SentinelFailover failover = setupFailover();
+ RedisSentinelConnection sentinelConn = new RedisSentinelConnection(conn, failover);
+ promise.complete(sentinelConn);
});
return promise.future();
}
+ private SentinelFailover setupFailover() {
+ SentinelFailover result = this.failover.get();
+
+ if (result == null) {
+ result = new SentinelFailover(connectOptions.getMasterName(), this::createConnectionInternal);
+ if (this.failover.compareAndSet(null, result)) {
+ result.start();
+ } else {
+ result = this.failover.get();
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void close() {
+ SentinelFailover failover = this.failover.get();
+ if (failover != null) {
+ failover.close();
+ }
+ super.close();
+ }
+
+ private Future createConnectionInternal(RedisRole role) {
+ Promise promise = Promise.promise();
+ createConnectionInternal(connectOptions, role, promise);
+ return promise.future();
+ }
+
private void createConnectionInternal(RedisSentinelConnectOptions options, RedisRole role, Handler> onCreate) {
final Handler> createAndConnect = resolve -> {
@@ -123,19 +139,18 @@ private void createConnectionInternal(RedisSentinelConnectOptions options, Redis
}
final RedisURI uri = resolve.result();
- final String endpoint = getBaseEndpoint(uri);
final Request setup;
// `SELECT` is only allowed on non-sentinel nodes
+ // we don't send `READONLY` setup to replica nodes, because that's a cluster-only command
if (role != RedisRole.SENTINEL && uri.select() != null) {
- setup = cmd(SELECT).arg(uri.select());
+ setup = Request.cmd(Command.SELECT).arg(uri.select());
} else {
- // we don't send `READONLY` setup to replica nodes, because that's a cluster-only command
setup = null;
}
// wrap a new client
- connectionManager.getConnection(endpoint, setup).onComplete(onCreate);
+ connectionManager.getConnection(uri.baseUri(), setup).onComplete(onCreate);
};
switch (role) {
@@ -207,12 +222,12 @@ private void isSentinelOk(String endpoint, RedisConnectOptions argument, Handler
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
- connectionManager.getConnection(getBaseEndpoint(uri), null)
+ connectionManager.getConnection(uri.baseUri(), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
// Send a command just to check we have a working node
conn
- .send(cmd(PING))
+ .send(Request.cmd(Command.PING))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(ok -> handler.handle(Future.succeededFuture(uri)))
.eventually(() -> conn.close().onFailure(LOG::warn));
@@ -223,13 +238,13 @@ private void getMasterFromEndpoint(String endpoint, RedisSentinelConnectOptions
// we can't use the endpoint as is, it should not contain a database selection,
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
- connectionManager.getConnection(getBaseEndpoint(uri), null)
+ connectionManager.getConnection(uri.baseUri(), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
final String masterName = options.getMasterName();
// Send a command just to check we have a working node
conn
- .send(cmd(SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName))
+ .send(Request.cmd(Command.SENTINEL).arg("GET-MASTER-ADDR-BY-NAME").arg(masterName))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(response -> {
if (response == null) {
@@ -248,13 +263,13 @@ private void getReplicaFromEndpoint(String endpoint, RedisSentinelConnectOptions
// we can't use the endpoint as is, it should not contain a database selection,
// but can contain authentication
final RedisURI uri = new RedisURI(endpoint);
- connectionManager.getConnection(getBaseEndpoint(uri), null)
+ connectionManager.getConnection(uri.baseUri(), null)
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(conn -> {
final String masterName = options.getMasterName();
// Send a command just to check we have a working node
conn
- .send(cmd(SENTINEL).arg("SLAVES").arg(masterName))
+ .send(Request.cmd(Command.SENTINEL).arg("SLAVES").arg(masterName))
.onFailure(err -> handler.handle(Future.failedFuture(err)))
.onSuccess(response -> {
// Test the response
@@ -290,24 +305,4 @@ private void getReplicaFromEndpoint(String endpoint, RedisSentinelConnectOptions
});
}
- private String getBaseEndpoint(RedisURI uri) {
- StringBuilder sb = new StringBuilder();
-
- if (uri.unix()) {
- sb.append("unix://");
- sb.append(uri.socketAddress().path());
- } else {
- sb.append("redis");
- if (uri.ssl()) {
- sb.append('s');
- }
- sb.append("://");
- sb.append(uri.userinfo());
- sb.append(uri.socketAddress().host());
- sb.append(':');
- sb.append(uri.socketAddress().port());
- }
-
- return sb.toString();
- }
}
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java
index c865236e..e8b81fd9 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisSentinelConnection.java
@@ -8,71 +8,81 @@
import io.vertx.redis.client.Response;
import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
public class RedisSentinelConnection implements RedisConnection {
- private final PooledRedisConnection connection;
- private final PooledRedisConnection sentinel;
+ private final AtomicReference connection;
+ private final SentinelFailover failover;
- public RedisSentinelConnection(PooledRedisConnection connection, PooledRedisConnection sentinel) {
- this.connection = connection;
- this.sentinel = sentinel;
+ public RedisSentinelConnection(PooledRedisConnection connection, SentinelFailover failover) {
+ this.connection = new AtomicReference<>(connection);
+ this.failover = failover;
+ failover.addConnection(this);
+ }
+
+ void reconnect(PooledRedisConnection newConnection) {
+ connection.set(newConnection);
}
@Override
public RedisConnection exceptionHandler(Handler handler) {
- connection.exceptionHandler(handler);
+ connection.get().exceptionHandler(handler);
return this;
}
@Override
public RedisConnection handler(Handler handler) {
- connection.handler(handler);
+ connection.get().handler(handler);
return this;
}
@Override
public RedisConnection pause() {
- connection.pause();
+ connection.get().pause();
return this;
}
@Override
public RedisConnection resume() {
- connection.resume();
+ connection.get().resume();
return this;
}
@Override
public RedisConnection fetch(long amount) {
- connection.fetch(amount);
+ connection.get().fetch(amount);
return this;
}
@Override
public RedisConnection endHandler(@Nullable Handler endHandler) {
- connection.endHandler(endHandler);
+ connection.get().endHandler(endHandler);
return this;
}
@Override
public Future<@Nullable Response> send(Request command) {
- return connection.send(command);
+ return connection.get().send(command);
}
@Override
public Future> batch(List commands) {
- return connection.batch(commands);
+ return connection.get().batch(commands);
+ }
+
+ Future closeDelegate() {
+ return connection.get().close();
}
@Override
public Future close() {
- return sentinel.close()
- .compose(done -> connection.close());
+ failover.removeConnection(this);
+ return closeDelegate();
}
@Override
public boolean pendingQueueFull() {
- return connection.pendingQueueFull();
+ return connection.get().pendingQueueFull();
}
}
diff --git a/src/main/java/io/vertx/redis/client/impl/RedisURI.java b/src/main/java/io/vertx/redis/client/impl/RedisURI.java
index cc5cff7a..61f06ccf 100644
--- a/src/main/java/io/vertx/redis/client/impl/RedisURI.java
+++ b/src/main/java/io/vertx/redis/client/impl/RedisURI.java
@@ -17,11 +17,11 @@
import io.vertx.core.net.SocketAddress;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -162,19 +162,11 @@ public RedisURI(String connectionString) {
}
private static String urlDecode(String raw) {
- try {
- return URLDecoder.decode(raw, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ return URLDecoder.decode(raw, StandardCharsets.UTF_8);
}
private static String urlEncode(String raw) {
- try {
- return URLEncoder.encode(raw, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ return URLEncoder.encode(raw, StandardCharsets.UTF_8);
}
private static Map parseQuery(URI uri) {
@@ -246,6 +238,29 @@ public String protocol() {
}
}
+ /**
+ * Returns the base of this URI, which consists of the scheme, optional user info, host and port
+ * (or path in case of a UNIX domain socket). Does not include the database number or the query parameters.
+ */
+ public String baseUri() {
+ StringBuilder result = new StringBuilder();
+ if (unix()) {
+ result.append("unix://");
+ result.append(socketAddress().path());
+ } else {
+ result.append("redis");
+ if (ssl()) {
+ result.append('s');
+ }
+ result.append("://");
+ result.append(userinfo());
+ result.append(socketAddress().host());
+ result.append(':');
+ result.append(socketAddress().port());
+ }
+ return result.toString();
+ }
+
@Override
public String toString() {
return protocol() + "://" + socketAddress() + "/" + (select == null ? "" : select);
diff --git a/src/main/java/io/vertx/redis/client/impl/SentinelFailover.java b/src/main/java/io/vertx/redis/client/impl/SentinelFailover.java
new file mode 100644
index 00000000..0cc327f6
--- /dev/null
+++ b/src/main/java/io/vertx/redis/client/impl/SentinelFailover.java
@@ -0,0 +1,102 @@
+package io.vertx.redis.client.impl;
+
+import io.vertx.core.Future;
+import io.vertx.core.internal.logging.Logger;
+import io.vertx.core.internal.logging.LoggerFactory;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.RedisRole;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.ResponseType;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+class SentinelFailover {
+ private static final Logger LOG = LoggerFactory.getLogger(SentinelFailover.class);
+
+ private static final int RETRIES = 3;
+
+ private final String masterSetName;
+ private final Function> connectionFactory;
+
+ private final AtomicReference sentinelConnection = new AtomicReference<>();
+ private final Set masterConnections = ConcurrentHashMap.newKeySet();
+
+ SentinelFailover(String masterSetName, Function> connectionFactory) {
+ this.masterSetName = masterSetName;
+ this.connectionFactory = connectionFactory;
+ }
+
+ void start() {
+ start(RETRIES);
+ }
+
+ private void start(int retries) {
+ connectionFactory.apply(RedisRole.SENTINEL)
+ .onFailure(t -> {
+ if (retries == 0) {
+ LOG.error("Failed to obtain a connection to Redis sentinel, automatic failover will not work: " + t);
+ } else {
+ start(retries - 1);
+ }
+ })
+ .onSuccess(sentinel -> {
+ PooledRedisConnection old = sentinelConnection.getAndSet(sentinel);
+ if (old != null) {
+ old.close()
+ .onFailure(err -> LOG.warn("Failed to close connection: " + err));
+ }
+
+ sentinel.handler(msg -> {
+ if (msg.type() == ResponseType.PUSH
+ && "message".equalsIgnoreCase(msg.get(0).toString())
+ && msg.get(2).toString().startsWith(masterSetName + " ")) {
+ reconnectAll();
+ }
+ });
+ sentinel.exceptionHandler(t -> {
+ sentinel.close();
+ start(RETRIES);
+ });
+ sentinel.send(Request.cmd(Command.SUBSCRIBE).arg("+switch-master"))
+ .onFailure(t -> {
+ sentinel.close();
+ if (retries == 0) {
+ LOG.error("Failed to subscribe +switch-master on Redis sentinel connection, reconnection to new master on failover will not work: " + t);
+ } else {
+ start(retries - 1);
+ }
+ });
+ });
+ }
+
+ private void reconnectAll() {
+ for (RedisSentinelConnection connection : masterConnections) {
+ connection.closeDelegate()
+ .recover(ignored -> Future.succeededFuture())
+ .compose(ignored -> connectionFactory.apply(RedisRole.MASTER))
+ .onSuccess(connection::reconnect)
+ .onFailure(t -> {
+ LOG.error("Failed to reconnect to master after failover: " + t);
+ });
+ }
+ }
+
+ void addConnection(RedisSentinelConnection sentinelConn) {
+ masterConnections.add(sentinelConn);
+ }
+
+ void removeConnection(RedisSentinelConnection sentinelConn) {
+ masterConnections.remove(sentinelConn);
+ }
+
+ void close() {
+ PooledRedisConnection sentinelConnection = this.sentinelConnection.get();
+ if (sentinelConnection != null) {
+ sentinelConnection.close()
+ .onFailure(err -> LOG.warn("Failed to close connection: " + err));
+ }
+ }
+}
diff --git a/src/main/java/io/vertx/redis/client/impl/SharedSlots.java b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java
new file mode 100644
index 00000000..b095157d
--- /dev/null
+++ b/src/main/java/io/vertx/redis/client/impl/SharedSlots.java
@@ -0,0 +1,121 @@
+package io.vertx.redis.client.impl;
+
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.internal.logging.Logger;
+import io.vertx.core.internal.logging.LoggerFactory;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.RedisClusterConnectOptions;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Exactly one instance of this class exists for each instance of {@link RedisClusterClient}
+ * and is also shared between all {@link RedisClusterConnection}s obtained from that client.
+ */
+class SharedSlots {
+ private static final Logger LOG = LoggerFactory.getLogger(SharedSlots.class);
+
+ private final Vertx vertx;
+ private final RedisClusterConnectOptions connectOptions;
+ private final RedisConnectionManager connectionManager;
+
+ private final AtomicReference> slots = new AtomicReference<>();
+
+ SharedSlots(Vertx vertx, RedisClusterConnectOptions connectOptions, RedisConnectionManager connectionManager) {
+ this.vertx = vertx;
+ this.connectOptions = connectOptions;
+ this.connectionManager = connectionManager;
+ }
+
+ Future get() {
+ while (true) {
+ Future slots = this.slots.get();
+ if (slots != null) {
+ return slots;
+ }
+
+ Promise promise = Promise.promise();
+ Future future = promise.future();
+ if (this.slots.compareAndSet(null, future)) {
+ LOG.debug("Obtaining hash slot assignment");
+ // attempt to load the slots from the first good endpoint
+ getSlots(connectOptions.getEndpoints(), 0, ConcurrentHashMap.newKeySet(), promise);
+ return future;
+ }
+ }
+ }
+
+ private void getSlots(List endpoints, int index, Set failures, Handler> onGotSlots) {
+ if (index >= endpoints.size()) {
+ // stop condition
+ StringBuilder message = new StringBuilder("Cannot connect to any of the provided endpoints");
+ for (Throwable failure : failures) {
+ message.append("\n- ").append(failure);
+ }
+ onGotSlots.handle(Future.failedFuture(new RedisConnectException(message.toString())));
+ scheduleInvalidation();
+ return;
+ }
+
+ connectionManager.getConnection(endpoints.get(index), RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
+ .onFailure(err -> {
+ // try with the next endpoint
+ failures.add(err);
+ getSlots(endpoints, index + 1, failures, onGotSlots);
+ })
+ .onSuccess(conn -> {
+ getSlots(endpoints.get(index), conn).onComplete(result -> {
+ // the connection is not needed anymore, regardless of success or failure
+ // (on success, we just finish, on failure, we'll try another endpoint)
+ conn.close().onFailure(LOG::warn);
+
+ if (result.failed()) {
+ // the slots command failed, try with next endpoint
+ failures.add(result.cause());
+ getSlots(endpoints, index + 1, failures, onGotSlots);
+ } else {
+ Slots slots = result.result();
+ onGotSlots.handle(Future.succeededFuture(slots));
+ scheduleInvalidation();
+ }
+ });
+ });
+ }
+
+ private Future getSlots(String endpoint, RedisConnection conn) {
+ return conn
+ .send(Request.cmd(Command.CLUSTER).arg("SLOTS"))
+ .compose(reply -> {
+ if (reply == null || reply.size() == 0) {
+ // no slots available we can't really proceed
+ return Future.failedFuture("CLUSTER SLOTS No slots available in the cluster.");
+ }
+
+ Slots result;
+ try {
+ result = new Slots(endpoint, reply);
+ } catch (Exception e) {
+ return Future.failedFuture("CLUSTER SLOTS response invalid: " + e);
+ }
+ return Future.succeededFuture(result);
+ });
+ }
+
+ void invalidate() {
+ slots.set(null);
+ }
+
+ void scheduleInvalidation() {
+ vertx.setTimer(connectOptions.getHashSlotCacheTTL(), ignored -> invalidate());
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/ClusterUtils.java b/src/test/java/io/vertx/redis/client/test/ClusterUtils.java
new file mode 100644
index 00000000..b428d1ab
--- /dev/null
+++ b/src/test/java/io/vertx/redis/client/test/ClusterUtils.java
@@ -0,0 +1,59 @@
+package io.vertx.redis.client.test;
+
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.Response;
+
+import java.util.function.BiPredicate;
+
+class ClusterUtils {
+ private final Vertx vertx;
+ private final Redis client;
+
+ ClusterUtils(Vertx vertx, Redis client) {
+ this.vertx = vertx;
+ this.client = client;
+ }
+
+ Future connectToMasterThatServesSlot(int slot) {
+ return connectToMasterAndReturnId((min, max) -> slot >= min && slot <= max);
+ }
+
+ Future connectToMasterThatDoesntServeSlot(int slot) {
+ return connectToMasterAndReturnId((min, max) -> slot < min || slot > max);
+ }
+
+ private Future connectToMasterAndReturnId(BiPredicate predicate) {
+ return client.send(Request.cmd(Command.CLUSTER).arg("SLOTS"))
+ .compose(slots -> {
+ for (int i = 0; i < slots.size(); i++) {
+ Response slot = slots.get(i);
+ if (predicate.test(slot.get(0).toInteger(), slot.get(1).toInteger())) {
+ Response node = slot.get(2);
+ String endpoint = "redis://" + node.get(0) + ":" + node.get(1);
+ String id = node.get(2).toString();
+ Redis redis = Redis.createClient(vertx, endpoint);
+ return redis.connect()
+ .map(conn -> new Result(redis, conn, id));
+ }
+ }
+ return Future.failedFuture("Couldn't find matching slot");
+ });
+ }
+
+ static class Result {
+ final Redis redis;
+ final RedisConnection conn;
+ final String id;
+
+ Result(Redis redis, RedisConnection conn, String id) {
+ this.redis = redis;
+ this.conn = conn;
+ this.id = id;
+ }
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/RedisClusterAskTest.java b/src/test/java/io/vertx/redis/client/test/RedisClusterAskTest.java
new file mode 100644
index 00000000..3bcd43f0
--- /dev/null
+++ b/src/test/java/io/vertx/redis/client/test/RedisClusterAskTest.java
@@ -0,0 +1,107 @@
+package io.vertx.redis.client.test;
+
+import io.vertx.core.Future;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisClientType;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisOptions;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.containers.RedisCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(VertxUnitRunner.class)
+public class RedisClusterAskTest {
+ @ClassRule
+ public static final RedisCluster redis = new RedisCluster();
+
+ @Rule
+ public final RunTestOnContext rule = new RunTestOnContext();
+
+ private final RedisOptions options = new RedisOptions()
+ .setType(RedisClientType.CLUSTER)
+ .setUseReplicas(RedisReplicas.NEVER)
+ .addConnectionString(redis.getRedisNode0Uri())
+ .addConnectionString(redis.getRedisNode1Uri())
+ .addConnectionString(redis.getRedisNode2Uri())
+ .addConnectionString(redis.getRedisNode3Uri())
+ .addConnectionString(redis.getRedisNode4Uri())
+ .addConnectionString(redis.getRedisNode5Uri());
+
+ private Redis client;
+ private ClusterUtils cluster;
+
+ @Before
+ public void createClient() {
+ client = Redis.createClient(rule.vertx(), options);
+ cluster = new ClusterUtils(rule.vertx(), client);
+ }
+
+ @After
+ public void cleanRedis() {
+ client.close();
+ }
+
+ @Test
+ public void test(TestContext test) {
+ Async async = test.async();
+
+ // slot number: 16287
+ // keys hashing to the slot: x, exs
+ int slot = 16287;
+ String key1 = "x";
+ String key2 = "exs";
+
+ client.connect().compose(clusterConn -> {
+ return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> {
+ Redis master = masterResult.redis;
+ RedisConnection masterConn = masterResult.conn;
+ String masterId = masterResult.id;
+ return cluster.connectToMasterThatDoesntServeSlot(slot).compose(otherMasterResult -> {
+ Redis otherMaster = otherMasterResult.redis;
+ RedisConnection otherMasterConn = otherMasterResult.conn;
+ String otherMasterId = otherMasterResult.id;
+ return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar"))
+ .compose(ignored -> {
+ return otherMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId));
+ })
+ .compose(ignored -> {
+ return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(otherMasterId));
+ })
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.GET).arg(key1));
+ })
+ .compose(result -> {
+ test.assertEquals("fubar", result.toString());
+ return clusterConn.send(Request.cmd(Command.GET).arg(key2)); // ASK
+ })
+ .compose(result -> {
+ test.assertEquals(null, result);
+ return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux")); // ASK
+ })
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.GET).arg(key2)); // ASK
+ })
+ .compose(result -> {
+ test.assertEquals("quux", result.toString());
+ master.close();
+ otherMaster.close();
+ return Future.succeededFuture();
+ });
+ });
+ });
+ }).onComplete(test.asyncAssertSuccess(ignored -> {
+ async.complete();
+ }));
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/RedisClusterMovedTest.java b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedTest.java
new file mode 100644
index 00000000..0e66ed82
--- /dev/null
+++ b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedTest.java
@@ -0,0 +1,116 @@
+package io.vertx.redis.client.test;
+
+import io.vertx.core.Future;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisClientType;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisOptions;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.impl.PooledRedisConnection;
+import io.vertx.redis.containers.RedisCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(VertxUnitRunner.class)
+public class RedisClusterMovedTest {
+ @ClassRule
+ public static final RedisCluster redis = new RedisCluster();
+
+ @Rule
+ public final RunTestOnContext rule = new RunTestOnContext();
+
+ private final RedisOptions options = new RedisOptions()
+ .setType(RedisClientType.CLUSTER)
+ .setUseReplicas(RedisReplicas.NEVER)
+ .addConnectionString(redis.getRedisNode0Uri())
+ .addConnectionString(redis.getRedisNode1Uri())
+ .addConnectionString(redis.getRedisNode2Uri())
+ .addConnectionString(redis.getRedisNode3Uri())
+ .addConnectionString(redis.getRedisNode4Uri())
+ .addConnectionString(redis.getRedisNode5Uri());
+
+ private Redis client;
+ private ClusterUtils cluster;
+
+ @Before
+ public void createClient() {
+ client = Redis.createClient(rule.vertx(), options);
+ cluster = new ClusterUtils(rule.vertx(), client);
+ }
+
+ @After
+ public void cleanRedis() {
+ client.close();
+ }
+
+ @Test
+ public void test(TestContext test) {
+ Async async = test.async();
+
+ // slot number: 16287
+ // keys hashing to the slot: x, exs
+ int slot = 16287;
+ String key1 = "x";
+ String key2 = "exs";
+
+ client.connect().compose(clusterConn -> {
+ return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> {
+ Redis master = masterResult.redis;
+ RedisConnection masterConn = masterResult.conn;
+ String masterId = masterResult.id;
+ return cluster.connectToMasterThatDoesntServeSlot(slot).compose(otherMasterResult -> {
+ Redis otherMaster = otherMasterResult.redis;
+ RedisConnection otherMasterConn = otherMasterResult.conn;
+ String otherMasterId = otherMasterResult.id;
+ return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar"))
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux"));
+ })
+ .compose(ignored -> {
+ return otherMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId));
+ })
+ .compose(ignored -> {
+ return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(otherMasterId));
+ })
+ .compose(ignored -> {
+ SocketAddress otherMasterAddr = ((PooledRedisConnection) otherMasterConn).actual().uri().socketAddress();
+ return masterConn.send(Request.cmd(Command.MIGRATE).arg(otherMasterAddr.host()).arg(otherMasterAddr.port())
+ .arg("").arg(0).arg(5000).arg("KEYS").arg(key1).arg(key2));
+ })
+ .compose(ignored -> {
+ return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(otherMasterId));
+ })
+ .compose(ignored -> {
+ return otherMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(otherMasterId));
+ })
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.GET).arg(key1)); // MOVED
+ })
+ .compose(result -> {
+ test.assertEquals("fubar", result.toString());
+ return clusterConn.send(Request.cmd(Command.GET).arg(key2)); // not MOVED, slots were reread
+ })
+ .compose(result -> {
+ test.assertEquals("quux", result.toString());
+ master.close();
+ otherMaster.close();
+ return Future.succeededFuture();
+ });
+ });
+ });
+ }).onComplete(test.asyncAssertSuccess(ignored -> {
+ async.complete();
+ }));
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java
new file mode 100644
index 00000000..165108ee
--- /dev/null
+++ b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java
@@ -0,0 +1,135 @@
+package io.vertx.redis.client.test;
+
+import io.vertx.core.Future;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisClientType;
+import io.vertx.redis.client.RedisConnection;
+import io.vertx.redis.client.RedisOptions;
+import io.vertx.redis.client.RedisReplicas;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.client.impl.PooledRedisConnection;
+import io.vertx.redis.containers.RedisCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(VertxUnitRunner.class)
+public class RedisClusterMovedToNewNodeTest {
+ @ClassRule
+ public static final RedisCluster redis = new RedisCluster();
+
+ @Rule
+ public final RunTestOnContext rule = new RunTestOnContext();
+
+ private final RedisOptions options = new RedisOptions()
+ .setType(RedisClientType.CLUSTER)
+ .setUseReplicas(RedisReplicas.NEVER)
+ .addConnectionString(redis.getRedisNode0Uri())
+ .addConnectionString(redis.getRedisNode1Uri())
+ .addConnectionString(redis.getRedisNode2Uri())
+ .addConnectionString(redis.getRedisNode3Uri())
+ .addConnectionString(redis.getRedisNode4Uri())
+ .addConnectionString(redis.getRedisNode5Uri());
+
+ private Redis client;
+ private ClusterUtils cluster;
+
+ @Before
+ public void createClient() {
+ client = Redis.createClient(rule.vertx(), options);
+ cluster = new ClusterUtils(rule.vertx(), client);
+ }
+
+ @After
+ public void cleanRedis() {
+ client.close();
+ }
+
+ @Test
+ public void test(TestContext test) {
+ Async async = test.async();
+
+ // slot number: 16287
+ // keys hashing to the slot: x, exs
+ int slot = 16287;
+ String key1 = "x";
+ String key2 = "exs";
+
+ client.connect().compose(clusterConn -> {
+ return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> {
+ Redis master = masterResult.redis;
+ RedisConnection masterConn = masterResult.conn;
+ String masterId = masterResult.id;
+ return addNewMaster().compose(newMasterResult -> {
+ Redis newMaster = newMasterResult.redis;
+ RedisConnection newMasterConn = newMasterResult.conn;
+ String newMasterId = newMasterResult.id;
+ return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar"))
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux"));
+ })
+ .compose(ignored -> {
+ return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId));
+ })
+ .compose(ignored -> {
+ return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(newMasterId));
+ }).compose(ignored -> {
+ SocketAddress newMasterAddr = ((PooledRedisConnection) newMasterConn).actual().uri().socketAddress();
+ return masterConn.send(Request.cmd(Command.MIGRATE).arg(newMasterAddr.host()).arg(newMasterAddr.port())
+ .arg("").arg(0).arg(5000).arg("KEYS").arg(key1).arg(key2));
+ })
+ .compose(ignored -> {
+ return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
+ })
+ .compose(ignored -> {
+ return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
+ })
+ .compose(ignored -> {
+ return clusterConn.send(Request.cmd(Command.GET).arg(key1));
+ })
+ .compose(result -> {
+ test.assertEquals("fubar", result.toString());
+ return clusterConn.send(Request.cmd(Command.GET).arg(key2));
+ })
+ .compose(result -> {
+ test.assertEquals("quux", result.toString());
+ master.close();
+ newMaster.close();
+ return Future.succeededFuture();
+ });
+ });
+ });
+ }).onComplete(test.asyncAssertSuccess(ignored -> {
+ async.complete();
+ }));
+ }
+
+ private Future addNewMaster() {
+ return rule.vertx()
+ .executeBlocking(() -> {
+ redis.addMaster(7006);
+ return null;
+ })
+ .compose(ignored -> {
+ Redis client = Redis.createClient(rule.vertx(), "redis://127.0.0.1:7006");
+ return client
+ .connect()
+ .compose(conn -> {
+ return conn.send(Request.cmd(Command.CLUSTER).arg("MYID"))
+ .map(resp -> {
+ String newId = resp.toString();
+ return new ClusterUtils.Result(client, conn, newId);
+ });
+ });
+ });
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/RedisClusterTest.java b/src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
index 7e7f7d6a..8df331f5 100644
--- a/src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
+++ b/src/test/java/io/vertx/redis/client/test/RedisClusterTest.java
@@ -195,57 +195,6 @@ public void runTheSlotScope(TestContext should) {
});
}
- @Test(timeout = 30_000)
- public void autoFindNodeByMOVEDAndASK(TestContext should) {
- final Async test = should.async();
-
- final RedisOptions options = new RedisOptions()
- .setType(RedisClientType.CLUSTER)
- // we will flood the redis server
- .setMaxWaitingHandlers(128 * 1024)
- .addConnectionString(redis.getRedisNode0Uri())
- .addConnectionString(redis.getRedisNode2Uri())
- .addConnectionString(redis.getRedisNode4Uri())
- .setMaxPoolSize(8)
- .setMaxPoolWaiting(16);
-
- // we miss add the odd port nodes on purpose
-
- final Redis client2 = Redis.createClient(rule.vertx(), options);
-
- client2
- .connect().onComplete(onCreate -> {
- should.assertTrue(onCreate.succeeded());
-
- final RedisConnection cluster = onCreate.result();
- cluster.exceptionHandler(should::fail);
-
- final int len = (int) Math.pow(2, 17);
- final AtomicInteger counter = new AtomicInteger();
-
- for (int i = 0; i < len; i++) {
- final String id = Integer.toString(i);
- cluster.send(cmd(SET).arg(id).arg(id)).onComplete(set -> {
- should.assertTrue(set.succeeded());
- cluster.send(cmd(GET).arg(id)).onComplete(get -> {
- should.assertTrue(get.succeeded());
- should.assertEquals(id, get.result().toString());
-
- final int cnt = counter.incrementAndGet();
- if (cnt % 1024 == 0) {
- System.out.print('.');
- }
-
- if (cnt == len) {
- client2.close();
- test.complete();
- }
- });
- });
- }
- });
- }
-
@Test(timeout = 30_000)
public void autoFindNodes(TestContext should) {
final Async test = should.async();
diff --git a/src/test/java/io/vertx/redis/client/test/RedisSentinelMasterFailoverTest.java b/src/test/java/io/vertx/redis/client/test/RedisSentinelMasterFailoverTest.java
new file mode 100644
index 00000000..7105565e
--- /dev/null
+++ b/src/test/java/io/vertx/redis/client/test/RedisSentinelMasterFailoverTest.java
@@ -0,0 +1,55 @@
+package io.vertx.redis.client.test;
+
+import io.vertx.ext.unit.Async;
+import io.vertx.ext.unit.TestContext;
+import io.vertx.ext.unit.junit.RunTestOnContext;
+import io.vertx.ext.unit.junit.VertxUnitRunner;
+import io.vertx.redis.client.Command;
+import io.vertx.redis.client.Redis;
+import io.vertx.redis.client.RedisClientType;
+import io.vertx.redis.client.RedisOptions;
+import io.vertx.redis.client.RedisRole;
+import io.vertx.redis.client.Request;
+import io.vertx.redis.containers.RedisSentinel;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import static io.vertx.redis.client.test.TestUtils.retryUntilSuccess;
+
+@RunWith(VertxUnitRunner.class)
+public class RedisSentinelMasterFailoverTest {
+ @ClassRule
+ public static final RedisSentinel redis = new RedisSentinel();
+
+ @Rule
+ public final RunTestOnContext rule = new RunTestOnContext();
+
+ @Test
+ public void test(TestContext test) {
+ Async async = test.async();
+
+ Redis.createClient(
+ rule.vertx(),
+ new RedisOptions()
+ .setType(RedisClientType.SENTINEL)
+ .addConnectionString(redis.getRedisSentinel0Uri())
+ .addConnectionString(redis.getRedisSentinel1Uri())
+ .addConnectionString(redis.getRedisSentinel2Uri())
+ .setRole(RedisRole.MASTER)
+ .setAutoFailover(true))
+ .connect()
+ .onComplete(test.asyncAssertSuccess(conn -> {
+ conn.send(Request.cmd(Command.SET).arg("key").arg("value"))
+ .compose(ignored -> conn.send(Request.cmd(Command.SHUTDOWN)))
+ .onComplete(test.asyncAssertFailure(ignored -> { // connection closed
+ retryUntilSuccess(rule.vertx(), () -> conn.send(Request.cmd(Command.GET).arg("key")), 50)
+ .onComplete(test.asyncAssertSuccess(response -> {
+ test.assertEquals("value", response.toString());
+ async.complete();
+ }));
+ }));
+ }));
+ }
+}
diff --git a/src/test/java/io/vertx/redis/client/test/TestUtils.java b/src/test/java/io/vertx/redis/client/test/TestUtils.java
index 6b4f6690..6c54c886 100644
--- a/src/test/java/io/vertx/redis/client/test/TestUtils.java
+++ b/src/test/java/io/vertx/redis/client/test/TestUtils.java
@@ -14,6 +14,11 @@ public static String randomKey() {
public static Future retryUntilSuccess(Vertx vertx, Supplier> action, int maxRetries) {
Promise promise = Promise.promise();
+ retryUntilSuccess(vertx, action, maxRetries, promise);
+ return promise.future();
+ }
+
+ private static void retryUntilSuccess(Vertx vertx, Supplier> action, int maxRetries, Promise promise) {
action.get().onComplete(result -> {
if (result.succeeded()) {
promise.complete(result.result());
@@ -22,11 +27,10 @@ public static Future retryUntilSuccess(Vertx vertx, Supplier> a
promise.fail(result.cause());
} else {
vertx.setTimer(500, ignored -> {
- retryUntilSuccess(vertx, action, maxRetries - 1).onComplete(promise);
+ retryUntilSuccess(vertx, action, maxRetries - 1, promise);
});
}
}
});
- return promise.future();
}
}
diff --git a/src/test/java/io/vertx/redis/containers/RedisCluster.java b/src/test/java/io/vertx/redis/containers/RedisCluster.java
index d54ec812..035eed4f 100644
--- a/src/test/java/io/vertx/redis/containers/RedisCluster.java
+++ b/src/test/java/io/vertx/redis/containers/RedisCluster.java
@@ -6,6 +6,9 @@
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+
+import java.io.IOException;
public class RedisCluster implements TestRule {
private final GenericContainer> container = new FixedHostPortGenericContainer<>("quay.io/ladicek/redis-cluster")
@@ -15,7 +18,11 @@ public class RedisCluster implements TestRule {
.withFixedExposedPort(7003, 7003)
.withFixedExposedPort(7004, 7004)
.withFixedExposedPort(7005, 7005)
- .waitingFor(Wait.forLogMessage(".*Cluster state changed: ok.*", 6));
+ .withFixedExposedPort(7006, 7006) // possible extra node, not present by default
+ .withFixedExposedPort(7007, 7007) // possible extra node, not present by default
+ .waitingFor(new WaitAllStrategy()
+ .withStrategy(Wait.forLogMessage(".*Cluster state changed: ok.*", 6))
+ .withStrategy(Wait.forSuccessfulCommand("/cluster-slots-expected-lines.sh 7000 7005 30")));
@Override
public Statement apply(Statement base, Description description) {
@@ -49,4 +56,16 @@ public String getRedisNode5Uri() {
private String getRedisUri(int portNumber) {
return "redis://" + container.getHost() + ":" + container.getMappedPort(portNumber);
}
+
+ public void addMaster(int portNumber) {
+ execute("/cluster-add-master.sh", "" + portNumber);
+ }
+
+ private void execute(String... command) {
+ try {
+ container.execInContainer(command);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}