Skip to content

Commit

Permalink
Add automatic connection to new (previously unknown) cluster nodes
Browse files Browse the repository at this point in the history
Redirections to existing nodes are handled already.
  • Loading branch information
Ladicek committed Aug 7, 2024
1 parent de2ac45 commit 3360bbe
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ private void connectionComplete(AtomicInteger counter, Slots slots, Map<String,
}
onConnected.handle(Future.failedFuture(new RedisConnectException(message.toString())));
} else {
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectOptions, sharedSlots,
connections)));
onConnected.handle(Future.succeededFuture(new RedisClusterConnection(vertx, connectionManager,
connectOptions, sharedSlots, connections)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ public static void addMasterOnlyCommand(Command command) {
}

private final VertxInternal vertx;
private final RedisConnectionManager connectionManager;
private final RedisClusterConnectOptions connectOptions;
private final SharedSlots sharedSlots;
private final Map<String, PooledRedisConnection> connections;

RedisClusterConnection(Vertx vertx, RedisClusterConnectOptions connectOptions, SharedSlots sharedSlots,
Map<String, PooledRedisConnection> connections) {
RedisClusterConnection(Vertx vertx, RedisConnectionManager connectionManager, RedisClusterConnectOptions connectOptions,
SharedSlots sharedSlots, Map<String, PooledRedisConnection> connections) {
this.vertx = (VertxInternal) vertx;
this.connectionManager = connectionManager;
this.connectOptions = connectOptions;
this.sharedSlots = sharedSlots;
this.connections = connections;
Expand Down Expand Up @@ -267,7 +269,25 @@ private Map<Integer, Request> splitRequest(CommandImpl cmd, List<byte[]> args) {
private void send(String endpoint, int retries, Request command, Handler<AsyncResult<Response>> handler) {
PooledRedisConnection connection = connections.get(endpoint);
if (connection == null) {
handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
.onSuccess(conn -> {
synchronized (connections) {
if (connections.containsKey(endpoint)) {
conn.close()
.onFailure(t -> LOG.warn("Failed closing connection: " + t));
} else {
connections.put(endpoint, conn);
}
}
send(endpoint, retries, command, handler);
})
.onFailure(t -> {
if (retries > 0) {
send(endpoint, retries - 1, command, handler);
} else {
handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
}
});
return;
}

Expand Down Expand Up @@ -298,20 +318,16 @@ private void send(String endpoint, int retries, Request command, Handler<AsyncRe
addr = uri.socketAddress().host() + addr;
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
PooledRedisConnection newConnection = connections.get(newEndpoint);
if (newConnection != null) {
if (ask) {
newConnection.send(Request.cmd(Command.ASKING))
.onFailure(err -> handler.handle(Future.failedFuture("Failed ASKING: " + err + ", caused by " + cause)))
.onSuccess(asking -> {
send(newEndpoint, retries - 1, command, handler);
});
} else {
send(newEndpoint, retries - 1, command, handler);
}
if (ask) {
send(newEndpoint, retries - 1, Request.cmd(Command.ASKING), resp -> {
if (resp.failed()) {
handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
} else {
send(newEndpoint, retries - 1, command, handler);
}
});
} else {
// unknown node, not sure how to handle that right now
handler.handle(Future.failedFuture("Unknown node on redirection, must reconnect: " + cause));
send(newEndpoint, retries - 1, command, handler);
}
return;
}
Expand Down Expand Up @@ -434,7 +450,25 @@ private Future<List<Response>> batch(List<Request> requests, Slots slots) {
private void batch(String endpoint, int retries, List<Request> commands, Handler<AsyncResult<List<Response>>> handler) {
RedisConnection connection = connections.get(endpoint);
if (connection == null) {
handler.handle(Future.failedFuture("Missing connection to: " + endpoint));
connectionManager.getConnection(endpoint, RedisReplicas.NEVER != connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null)
.onSuccess(conn -> {
synchronized (connections) {
if (connections.containsKey(endpoint)) {
conn.close()
.onFailure(t -> LOG.warn("Failed closing connection: " + t));
} else {
connections.put(endpoint, conn);
}
}
batch(endpoint, retries, commands, handler);
})
.onFailure(t -> {
if (retries > 0) {
batch(endpoint, retries - 1, commands, handler);
} else {
handler.handle(Future.failedFuture("Failed obtaining connection to: " + endpoint));
}
});
return;
}

Expand Down Expand Up @@ -465,20 +499,16 @@ private void batch(String endpoint, int retries, List<Request> commands, Handler
addr = uri.socketAddress().host() + addr;
}
String newEndpoint = uri.protocol() + "://" + uri.userinfo() + addr;
PooledRedisConnection newConnection = connections.get(newEndpoint);
if (newConnection != null) {
if (ask) {
newConnection.send(Request.cmd(Command.ASKING))
.onFailure(err -> handler.handle(Future.failedFuture("Failed ASKING: " + err + ", caused by " + cause)))
.onSuccess(asking -> {
batch(newEndpoint, retries - 1, commands, handler);
});
} else {
batch(newEndpoint, retries - 1, commands, handler);
}
if (ask) {
batch(newEndpoint, retries - 1, Collections.singletonList(Request.cmd(Command.ASKING)), resp -> {
if (resp.failed()) {
handler.handle(Future.failedFuture("Failed ASKING: " + resp.cause() + ", caused by " + cause));
} else {
batch(newEndpoint, retries - 1, commands, handler);
}
});
} else {
// unknown node, not sure how to handle that right now
handler.handle(Future.failedFuture("Unknown node on redirection, must reconnect: " + cause));
batch(newEndpoint, retries - 1, commands, handler);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package io.vertx.redis.client.test;

import io.vertx.core.Future;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisClientType;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.impl.PooledRedisConnection;
import io.vertx.redis.containers.RedisCluster;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.concurrent.TimeUnit;

@RunWith(VertxUnitRunner.class)
public class RedisClusterMovedToNewNodeTest {
@ClassRule
public static final RedisCluster redis = new RedisCluster();

@Rule
public final RunTestOnContext rule = new RunTestOnContext();

private final RedisOptions options = new RedisOptions()
.setType(RedisClientType.CLUSTER)
.setUseReplicas(RedisReplicas.NEVER)
.addConnectionString(redis.getRedisNode0Uri())
.addConnectionString(redis.getRedisNode1Uri())
.addConnectionString(redis.getRedisNode2Uri())
.addConnectionString(redis.getRedisNode3Uri())
.addConnectionString(redis.getRedisNode4Uri())
.addConnectionString(redis.getRedisNode5Uri());

private Redis client;
private ClusterUtils cluster;

@Before
public void createClient() {
client = Redis.createClient(rule.vertx(), options);
cluster = new ClusterUtils(rule.vertx(), client);
}

@After
public void cleanRedis() {
client.close();
}

@Test
public void test(TestContext test) {
Async async = test.async();

// slot number: 16287
// keys hashing to the slot: x, exs
int slot = 16287;
String key1 = "x";
String key2 = "exs";

client.connect().compose(clusterConn -> {
return cluster.connectToMasterThatServesSlot(slot).compose(masterResult -> {
Redis master = masterResult.redis;
RedisConnection masterConn = masterResult.conn;
String masterId = masterResult.id;
return addNewMaster().compose(newMasterResult -> {
Redis newMaster = newMasterResult.redis;
RedisConnection newMasterConn = newMasterResult.conn;
String newMasterId = newMasterResult.id;
return clusterConn.send(Request.cmd(Command.SET).arg(key1).arg("fubar"))
.compose(ignored -> {
return clusterConn.send(Request.cmd(Command.SET).arg(key2).arg("quux"));
})
.compose(ignored -> {
return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("IMPORTING").arg(masterId));
})
.compose(ignored -> {
return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("MIGRATING").arg(newMasterId));
}).compose(ignored -> {
SocketAddress newMasterAddr = ((PooledRedisConnection) newMasterConn).actual().uri().socketAddress();
return masterConn.send(Request.cmd(Command.MIGRATE).arg(newMasterAddr.host()).arg(newMasterAddr.port())
.arg("").arg(0).arg(5000).arg("KEYS").arg(key1).arg(key2));
})
.compose(ignored -> {
return masterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
})
.compose(ignored -> {
return newMasterConn.send(Request.cmd(Command.CLUSTER).arg("SETSLOT").arg(slot).arg("NODE").arg(newMasterId));
})
.compose(ignored -> {
return clusterConn.send(Request.cmd(Command.GET).arg(key1));
})
.compose(result -> {
test.assertEquals("fubar", result.toString());
return clusterConn.send(Request.cmd(Command.GET).arg(key2));
})
.compose(result -> {
test.assertEquals("quux", result.toString());
master.close();
newMaster.close();
return Future.succeededFuture();
});
});
});
}).onComplete(test.asyncAssertSuccess(ignored -> {
async.complete();
}));
}

private Future<ClusterUtils.Result> addNewMaster() {
return rule.vertx()
.executeBlocking(() -> {
redis.addMaster(7006);
return null;
})
.compose(ignored -> {
Redis client = Redis.createClient(rule.vertx(), "redis://127.0.0.1:7006");
return client
.connect()
.compose(conn -> {
return conn.send(Request.cmd(Command.CLUSTER).arg("MYID"))
.map(resp -> {
String newId = resp.toString();
return new ClusterUtils.Result(client, conn, newId);
});
});
});
}
}

0 comments on commit 3360bbe

Please sign in to comment.