Skip to content

Commit

Permalink
Add automatic failover of cluster connections
Browse files Browse the repository at this point in the history
This only applies in case of newly added cluster nodes.
Redirections to existing nodes are followed already.

WIP requires documentation WIP
  • Loading branch information
Ladicek committed Aug 6, 2024
1 parent de2ac45 commit 78eecbf
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class RedisClusterConnectOptions extends RedisConnectOptions {

private RedisReplicas useReplicas;
private long hashSlotCacheTTL;
private boolean autoFailover;

public RedisClusterConnectOptions() {
super();
Expand All @@ -38,12 +39,14 @@ public RedisClusterConnectOptions(RedisOptions options) {
super(options);
setUseReplicas(options.getUseReplicas());
setHashSlotCacheTTL(options.getHashSlotCacheTTL());
setAutoFailover(options.isAutoFailover());
}

public RedisClusterConnectOptions(RedisClusterConnectOptions other) {
super(other);
setUseReplicas(other.getUseReplicas());
setHashSlotCacheTTL(other.getHashSlotCacheTTL());
setAutoFailover(other.isAutoFailover());
}

public RedisClusterConnectOptions(JsonObject json) {
Expand Down Expand Up @@ -92,6 +95,14 @@ public RedisClusterConnectOptions setHashSlotCacheTTL(long hashSlotCacheTTL) {
return this;
}

public boolean isAutoFailover() {
return autoFailover;
}

public void setAutoFailover(boolean autoFailover) {
this.autoFailover = autoFailover;
}

@Override
public RedisClusterConnectOptions setMaxNestedArrays(int maxNestedArrays) {
return (RedisClusterConnectOptions) super.setMaxNestedArrays(maxNestedArrays);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,11 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map<String,
}
onConnected.handle(Future.failedFuture(new RedisConnectException(message.toString())));
} else {
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, sharedSlots,
connections)));
RedisConnection conn = new RedisClusterConnection(vertx, connectOptions, sharedSlots, connections);
if (connectOptions.isAutoFailover()) {
conn = new RedisClusterFailoverConnection(this, conn);
}
onConnected.handle(Future.succeededFuture(conn));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
private void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) {
PooledRedisConnection connection = connections.get(endpoint);
if (connection == null) {
handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Missing connection to: " + endpoint)));
return;
}

Expand Down Expand Up @@ -310,8 +310,7 @@ private void send(String endpoint, int retries, Request command, Handler<AsyncRe
send(newEndpoint, retries - 1, command, handler);
}
} else {
// unknown node, not sure how to handle that right now
handler.handle(Future.failedFuture("Unknown node on redirection, must reconnect: " + cause));
handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Unknown node on redirection, must reconnect: " + cause)));
}
return;
}
Expand Down Expand Up @@ -434,7 +433,7 @@ private Future<List<Response>> batch(List<Request> requests, Slots slots) {
private void batch(String endpoint, int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
RedisConnection connection = connections.get(endpoint);
if (connection == null) {
handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Missing connection to: " + endpoint)));
return;
}

Expand Down Expand Up @@ -477,8 +476,7 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
batch(newEndpoint, retries - 1, commands, handler);
}
} else {
// unknown node, not sure how to handle that right now
handler.handle(Future.failedFuture("Unknown node on redirection, must reconnect: " + cause));
handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Unknown node on redirection, must reconnect: " + cause)));
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.vertx.redis.client.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;

import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

public class RedisClusterFailoverConnection implements RedisConnection {
private static final Logger LOG = LoggerFactory.getLogger(RedisClusterFailoverConnection.class);

private static final int RETRIES = 4;

private final RedisClusterClient client;
private final AtomicReference<RedisConnection> delegate;

public RedisClusterFailoverConnection(RedisClusterClient client, RedisConnection connection) {
this.client = client;
this.delegate = new AtomicReference<>(connection);
}

@Override
public RedisConnection exceptionHandler(Handler<Throwable> handler) {
delegate.get().exceptionHandler(handler);
return this;
}

@Override
public RedisConnection handler(@Nullable Handler<Response> handler) {
delegate.get().handler(handler);
return this;
}

@Override
public RedisConnection pause() {
delegate.get().pause();
return this;
}

@Override
public RedisConnection resume() {
delegate.get().resume();
return this;
}

@Override
public RedisConnection fetch(long amount) {
delegate.get().fetch(amount);
return this;
}

@Override
public RedisConnection endHandler(@Nullable Handler<Void> endHandler) {
delegate.get().endHandler(endHandler);
return this;
}

@Override
public Future<@Nullable Response> send(Request command) {
Promise<Response> promise = Promise.promise();
doSend(command, RETRIES, null, promise);
return promise.future();
}

private void doSend(Request request, int retryAttempts, Throwable lastFailure, Promise<Response> promise) {
if (retryAttempts == 0) {
promise.fail(lastFailure);
return;
}

delegate.get().send(request)
.onSuccess(promise::complete)
.onFailure(failure -> {
if (failure instanceof RedisUnknownEndpointException) {
LOG.debug("Reconnecting due to: " + failure);
reconnect()
.onSuccess(ignored -> doSend(request, retryAttempts - 1, failure, promise))
.onFailure(promise::fail);
} else {
promise.fail(failure);
}
});
}

@Override
public Future<List<@Nullable Response>> batch(List<Request> commands) {
Promise<List<Response>> promise = Promise.promise();
doBatch(commands, RETRIES, null, promise);
return promise.future();
}

private void doBatch(List<Request> requests, int retryAttempts, Throwable lastFailure, Promise<List<Response>> promise) {
if (retryAttempts == 0) {
promise.fail(lastFailure);
return;
}

delegate.get().batch(requests)
.onSuccess(promise::complete)
.onFailure(failure -> {
if (failure instanceof RedisUnknownEndpointException) {
LOG.debug("Reconnecting due to: " + failure);
reconnect()
.onSuccess(ignored -> doBatch(requests, retryAttempts - 1, failure, promise))
.onFailure(promise::fail);
} else {
promise.fail(failure);
}
});
}

private Future<Void> reconnect() {
return delegate.get().close()
.recover(err -> {
LOG.warn("Failed closing old connection: " + err);
return Future.succeededFuture();
})
.compose(ignored -> client.connect())
.map(conn -> {
delegate.set(conn);
return null;
});
}

@Override
public Future<Void> close() {
return delegate.get().close();
}

@Override
public boolean pendingQueueFull() {
return delegate.get().pendingQueueFull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.vertx.redis.client.impl;

/**
* Signals an unknown endpoint in the Redis cluster client. This most likely
* means that a new node was added to the cluster and a reconnection is needed.
*/
class RedisUnknownEndpointException extends RedisConnectException {
RedisUnknownEndpointException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.vertx.redis.client.test;

import io.vertx.core.Future;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisClientType;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.impl.PooledRedisConnection;
import io.vertx.redis.containers.RedisCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeUnit;

@RunWith(VertxUnitRunner.class)
public class RedisClusterMovedToNewNodeTest {
@ClassRule
public static final RedisCluster redis = new RedisCluster();

@Rule
public final RunTestOnContext rule = new RunTestOnContext();

private final RedisOptions options = new RedisOptions()
.setType(RedisClientType.CLUSTER)
.setUseReplicas(RedisReplicas.NEVER)
.setAutoFailover(true)
.addConnectionString(redis.getRedisNode0Uri())
.addConnectionString(redis.getRedisNode1Uri())
.addConnectionString(redis.getRedisNode2Uri())
.addConnectionString(redis.getRedisNode3Uri())
.addConnectionString(redis.getRedisNode4Uri())
.addConnectionString(redis.getRedisNode5Uri());

private Redis client;
private ClusterUtils cluster;

@Before
public void createClient() {
client = Redis.createClient(rule.vertx(), options);
cluster = new ClusterUtils(rule.vertx(), client);
}

@After
public void cleanRedis() {
client.close();
}

@Test
public void test(TestContext test) {
Async async = test.async();

// slot number: 16287
// keys hashing to the slot: x, exs
int slot = 16287;
String key1 = "x";
String key2 = "exs";

client.connect().compose(clusterConn -> {
return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> {
Redis master = masterResult.redis;
RedisConnection masterConn = masterResult.conn;
String masterId = masterResult.id;
return addNewMaster().compose(newMasterResult -> {
Redis newMaster = newMasterResult.redis;
RedisConnection newMasterConn = newMasterResult.conn;
String newMasterId = newMasterResult.id;
return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar"))
.compose(ignored -> {
return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux"));
})
.compose(ignored -> {
return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId));
})
.compose(ignored -> {
return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(newMasterId));
})
.compose(ignored -> {
// TODO why???
return rule.vertx().timer(2000, TimeUnit.MILLISECONDS);
}).compose(ignored -> {
SocketAddress newMasterAddr = ((PooledRedisConnection) newMasterConn).actual().uri().socketAddress();
return masterConn.send(Request.cmd(Command.MIGRATE).arg(newMasterAddr.host()).arg(newMasterAddr.port())
.arg("").arg(0).arg(5000).arg("KEYS").arg(key1).arg(key2));
})
.compose(ignored -> {
return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
})
.compose(ignored -> {
return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
})
.compose(ignored -> {
return clusterConn.send(Request.cmd(Command.GET).arg(key1));
})
.compose(result -> {
test.assertEquals("fubar", result.toString());
return clusterConn.send(Request.cmd(Command.GET).arg(key2));
})
.compose(result -> {
test.assertEquals("quux", result.toString());
master.close();
newMaster.close();
return Future.succeededFuture();
});
});
});
}).onComplete(test.asyncAssertSuccess(ignored -> {
async.complete();
}));
}

private Future<ClusterUtils.Result> addNewMaster() {
return rule.vertx()
.executeBlocking(() -> {
redis.addMaster(7006);
return null;
})
.compose(ignored -> {
Redis client = Redis.createClient(rule.vertx(), "redis://127.0.0.1:7006");
return client
.connect()
.compose(conn -> {
return conn.send(Request.cmd(Command.CLUSTER).arg("MYID"))
.map(resp -> {
String newId = resp.toString();
return new ClusterUtils.Result(client, conn, newId);
});
});
});
}
}

0 comments on commit 78eecbf

Please sign in to comment.