From 78eecbfcc6cb60a47757ea05cc4b9615225a2e09 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Tue, 6 Aug 2024 17:27:07 +0200 Subject: [PATCH] Add automatic failover of cluster connections This only applies in case of newly added cluster nodes. Redirections to existing nodes are followed already. WIP requires documentation WIP --- .../client/RedisClusterConnectOptions.java | 11 ++ .../redis/client/impl/RedisClusterClient.java | 7 +- .../client/impl/RedisClusterConnection.java | 10 +- .../impl/RedisClusterFailoverConnection.java | 141 +++++++++++++++++ .../impl/RedisUnknownEndpointException.java | 11 ++ .../test/RedisClusterMovedToNewNodeTest.java | 144 ++++++++++++++++++ 6 files changed, 316 insertions(+), 8 deletions(-) create mode 100644 src/main/java/io/vertx/redis/client/impl/RedisClusterFailoverConnection.java create mode 100644 src/main/java/io/vertx/redis/client/impl/RedisUnknownEndpointException.java create mode 100644 src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java diff --git a/src/main/java/io/vertx/redis/client/RedisClusterConnectOptions.java b/src/main/java/io/vertx/redis/client/RedisClusterConnectOptions.java index c3eeaf6e..a015c8b3 100644 --- a/src/main/java/io/vertx/redis/client/RedisClusterConnectOptions.java +++ b/src/main/java/io/vertx/redis/client/RedisClusterConnectOptions.java @@ -27,6 +27,7 @@ public class RedisClusterConnectOptions extends RedisConnectOptions { private RedisReplicas useReplicas; private long hashSlotCacheTTL; + private boolean autoFailover; public RedisClusterConnectOptions() { super(); @@ -38,12 +39,14 @@ public RedisClusterConnectOptions(RedisOptions options) { super(options); setUseReplicas(options.getUseReplicas()); setHashSlotCacheTTL(options.getHashSlotCacheTTL()); + setAutoFailover(options.isAutoFailover()); } public RedisClusterConnectOptions(RedisClusterConnectOptions other) { super(other); setUseReplicas(other.getUseReplicas()); setHashSlotCacheTTL(other.getHashSlotCacheTTL()); + setAutoFailover(other.isAutoFailover()); } public RedisClusterConnectOptions(JsonObject json) { @@ -92,6 +95,14 @@ public RedisClusterConnectOptions setHashSlotCacheTTL(long hashSlotCacheTTL) { return this; } + public boolean isAutoFailover() { + return autoFailover; + } + + public void setAutoFailover(boolean autoFailover) { + this.autoFailover = autoFailover; + } + @Override public RedisClusterConnectOptions setMaxNestedArrays(int maxNestedArrays) { return (RedisClusterConnectOptions) super.setMaxNestedArrays(maxNestedArrays); diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java index ac3577e4..3a0b3054 100644 --- a/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java +++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterClient.java @@ -202,8 +202,11 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map splitRequest(CommandImpl cmd, List args) { private void send(String endpoint, int retries, Request command, Handler> handler) { PooledRedisConnection connection = connections.get(endpoint); if (connection == null) { - handler.handle(Future.failedFuture("Missing connection to: " + endpoint)); + handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Missing connection to: " + endpoint))); return; } @@ -310,8 +310,7 @@ private void send(String endpoint, int retries, Request command, Handler> batch(List requests, Slots slots) { private void batch(String endpoint, int retries, List commands, Handler>> handler) { RedisConnection connection = connections.get(endpoint); if (connection == null) { - handler.handle(Future.failedFuture("Missing connection to: " + endpoint)); + handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Missing connection to: " + endpoint))); return; } @@ -477,8 +476,7 @@ private void batch(String endpoint, int retries, List commands, Handler batch(newEndpoint, retries - 1, commands, handler); } } else { - // unknown node, not sure how to handle that right now - handler.handle(Future.failedFuture("Unknown node on redirection, must reconnect: " + cause)); + handler.handle(Future.failedFuture(new RedisUnknownEndpointException("Unknown node on redirection, must reconnect: " + cause))); } return; } diff --git a/src/main/java/io/vertx/redis/client/impl/RedisClusterFailoverConnection.java b/src/main/java/io/vertx/redis/client/impl/RedisClusterFailoverConnection.java new file mode 100644 index 00000000..854d4ad9 --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/RedisClusterFailoverConnection.java @@ -0,0 +1,141 @@ +package io.vertx.redis.client.impl; + +import io.vertx.codegen.annotations.Nullable; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.internal.logging.Logger; +import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.Response; + +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +public class RedisClusterFailoverConnection implements RedisConnection { + private static final Logger LOG = LoggerFactory.getLogger(RedisClusterFailoverConnection.class); + + private static final int RETRIES = 4; + + private final RedisClusterClient client; + private final AtomicReference delegate; + + public RedisClusterFailoverConnection(RedisClusterClient client, RedisConnection connection) { + this.client = client; + this.delegate = new AtomicReference<>(connection); + } + + @Override + public RedisConnection exceptionHandler(Handler handler) { + delegate.get().exceptionHandler(handler); + return this; + } + + @Override + public RedisConnection handler(@Nullable Handler handler) { + delegate.get().handler(handler); + return this; + } + + @Override + public RedisConnection pause() { + delegate.get().pause(); + return this; + } + + @Override + public RedisConnection resume() { + delegate.get().resume(); + return this; + } + + @Override + public RedisConnection fetch(long amount) { + delegate.get().fetch(amount); + return this; + } + + @Override + public RedisConnection endHandler(@Nullable Handler endHandler) { + delegate.get().endHandler(endHandler); + return this; + } + + @Override + public Future<@Nullable Response> send(Request command) { + Promise promise = Promise.promise(); + doSend(command, RETRIES, null, promise); + return promise.future(); + } + + private void doSend(Request request, int retryAttempts, Throwable lastFailure, Promise promise) { + if (retryAttempts == 0) { + promise.fail(lastFailure); + return; + } + + delegate.get().send(request) + .onSuccess(promise::complete) + .onFailure(failure -> { + if (failure instanceof RedisUnknownEndpointException) { + LOG.debug("Reconnecting due to: " + failure); + reconnect() + .onSuccess(ignored -> doSend(request, retryAttempts - 1, failure, promise)) + .onFailure(promise::fail); + } else { + promise.fail(failure); + } + }); + } + + @Override + public Future> batch(List commands) { + Promise> promise = Promise.promise(); + doBatch(commands, RETRIES, null, promise); + return promise.future(); + } + + private void doBatch(List requests, int retryAttempts, Throwable lastFailure, Promise> promise) { + if (retryAttempts == 0) { + promise.fail(lastFailure); + return; + } + + delegate.get().batch(requests) + .onSuccess(promise::complete) + .onFailure(failure -> { + if (failure instanceof RedisUnknownEndpointException) { + LOG.debug("Reconnecting due to: " + failure); + reconnect() + .onSuccess(ignored -> doBatch(requests, retryAttempts - 1, failure, promise)) + .onFailure(promise::fail); + } else { + promise.fail(failure); + } + }); + } + + private Future reconnect() { + return delegate.get().close() + .recover(err -> { + LOG.warn("Failed closing old connection: " + err); + return Future.succeededFuture(); + }) + .compose(ignored -> client.connect()) + .map(conn -> { + delegate.set(conn); + return null; + }); + } + + @Override + public Future close() { + return delegate.get().close(); + } + + @Override + public boolean pendingQueueFull() { + return delegate.get().pendingQueueFull(); + } +} diff --git a/src/main/java/io/vertx/redis/client/impl/RedisUnknownEndpointException.java b/src/main/java/io/vertx/redis/client/impl/RedisUnknownEndpointException.java new file mode 100644 index 00000000..0200cc4b --- /dev/null +++ b/src/main/java/io/vertx/redis/client/impl/RedisUnknownEndpointException.java @@ -0,0 +1,11 @@ +package io.vertx.redis.client.impl; + +/** + * Signals an unknown endpoint in the Redis cluster client. This most likely + * means that a new node was added to the cluster and a reconnection is needed. + */ +class RedisUnknownEndpointException extends RedisConnectException { + RedisUnknownEndpointException(String msg) { + super(msg); + } +} diff --git a/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java new file mode 100644 index 00000000..a70ef514 --- /dev/null +++ b/src/test/java/io/vertx/redis/client/test/RedisClusterMovedToNewNodeTest.java @@ -0,0 +1,144 @@ +package io.vertx.redis.client.test; + +import io.vertx.core.Future; +import io.vertx.core.internal.logging.Logger; +import io.vertx.core.internal.logging.LoggerFactory; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.RunTestOnContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisClientType; +import io.vertx.redis.client.RedisConnection; +import io.vertx.redis.client.RedisOptions; +import io.vertx.redis.client.RedisReplicas; +import io.vertx.redis.client.Request; +import io.vertx.redis.client.impl.PooledRedisConnection; +import io.vertx.redis.containers.RedisCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.concurrent.TimeUnit; + +@RunWith(VertxUnitRunner.class) +public class RedisClusterMovedToNewNodeTest { + @ClassRule + public static final RedisCluster redis = new RedisCluster(); + + @Rule + public final RunTestOnContext rule = new RunTestOnContext(); + + private final RedisOptions options = new RedisOptions() + .setType(RedisClientType.CLUSTER) + .setUseReplicas(RedisReplicas.NEVER) + .setAutoFailover(true) + .addConnectionString(redis.getRedisNode0Uri()) + .addConnectionString(redis.getRedisNode1Uri()) + .addConnectionString(redis.getRedisNode2Uri()) + .addConnectionString(redis.getRedisNode3Uri()) + .addConnectionString(redis.getRedisNode4Uri()) + .addConnectionString(redis.getRedisNode5Uri()); + + private Redis client; + private ClusterUtils cluster; + + @Before + public void createClient() { + client = Redis.createClient(rule.vertx(), options); + cluster = new ClusterUtils(rule.vertx(), client); + } + + @After + public void cleanRedis() { + client.close(); + } + + @Test + public void test(TestContext test) { + Async async = test.async(); + + // slot number: 16287 + // keys hashing to the slot: x, exs + int slot = 16287; + String key1 = "x"; + String key2 = "exs"; + + client.connect().compose(clusterConn -> { + return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> { + Redis master = masterResult.redis; + RedisConnection masterConn = masterResult.conn; + String masterId = masterResult.id; + return addNewMaster().compose(newMasterResult -> { + Redis newMaster = newMasterResult.redis; + RedisConnection newMasterConn = newMasterResult.conn; + String newMasterId = newMasterResult.id; + return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar")) + .compose(ignored -> { + return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux")); + }) + .compose(ignored -> { + return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId)); + }) + .compose(ignored -> { + return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(newMasterId)); + }) + .compose(ignored -> { + // TODO why??? + return rule.vertx().timer(2000, TimeUnit.MILLISECONDS); + }).compose(ignored -> { + SocketAddress newMasterAddr = ((PooledRedisConnection) newMasterConn).actual().uri().socketAddress(); + return masterConn.send(Request.cmd(Command.MIGRATE).arg(newMasterAddr.host()).arg(newMasterAddr.port()) + .arg("").arg(0).arg(5000).arg("KEYS").arg(key1).arg(key2)); + }) + .compose(ignored -> { + return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId)); + }) + .compose(ignored -> { + return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId)); + }) + .compose(ignored -> { + return clusterConn.send(Request.cmd(Command.GET).arg(key1)); + }) + .compose(result -> { + test.assertEquals("fubar", result.toString()); + return clusterConn.send(Request.cmd(Command.GET).arg(key2)); + }) + .compose(result -> { + test.assertEquals("quux", result.toString()); + master.close(); + newMaster.close(); + return Future.succeededFuture(); + }); + }); + }); + }).onComplete(test.asyncAssertSuccess(ignored -> { + async.complete(); + })); + } + + private Future addNewMaster() { + return rule.vertx() + .executeBlocking(() -> { + redis.addMaster(7006); + return null; + }) + .compose(ignored -> { + Redis client = Redis.createClient(rule.vertx(), "redis://127.0.0.1:7006"); + return client + .connect() + .compose(conn -> { + return conn.send(Request.cmd(Command.CLUSTER).arg("MYID")) + .map(resp -> { + String newId = resp.toString(); + return new ClusterUtils.Result(client, conn, newId); + }); + }); + }); + } +}