Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing periodic topology mechanism for JedisCluster #3596

10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7379
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node1.pid
logfile /tmp/redis_cluster_node1.log
save ""
Expand All @@ -223,7 +223,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7380
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node2.pid
logfile /tmp/redis_cluster_node2.log
save ""
Expand All @@ -237,7 +237,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7381
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node3.pid
logfile /tmp/redis_cluster_node3.log
save ""
Expand All @@ -251,7 +251,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7382
cluster-node-timeout 50
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node4.pid
logfile /tmp/redis_cluster_node4.log
save ""
Expand All @@ -265,7 +265,7 @@ daemonize yes
protected-mode no
requirepass cluster
port 7383
cluster-node-timeout 5000
cluster-node-timeout 15000
pidfile /tmp/redis_cluster_node5.pid
logfile /tmp/redis_cluster_node5.log
save ""
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/ClusterPipeline.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.providers.ClusterConnectionProvider;
Expand All @@ -23,6 +24,13 @@ public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientCo
this.closeable = this.provider;
}

public ClusterPipeline(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
createClusterCommandObjects(clientConfig.getRedisProtocol()));
this.closeable = this.provider;
}

public ClusterPipeline(ClusterConnectionProvider provider) {
this(provider, new ClusterCommandObjects());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately

socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout);
// Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor.
// For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail.
socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout);
yangbodong22011 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Adding for future reference:]
Please check #3597 for more information about this change.

return socket;
} catch (Exception e) {
jce.addSuppressed(e);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

public class JedisCluster extends UnifiedJedis {

public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";

/**
* Default timeout in milliseconds.
*/
Expand Down Expand Up @@ -195,6 +197,13 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration);
}

public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
Expand Down
85 changes: 81 additions & 4 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,17 +11,26 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.util.SafeEncoder;

import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

public class JedisClusterInfoCache {

private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);

private final Map<String, ConnectionPool> nodes = new HashMap<>();
private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS];
private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS];
Expand All @@ -36,21 +46,75 @@ public class JedisClusterInfoCache {

private static final int MASTER_NODE_INDEX = 2;

/**
* The single thread executor for the topology refresh task.
*/
private ScheduledExecutorService topologyRefreshExecutor = null;
yangbodong22011 marked this conversation as resolved.
Show resolved Hide resolved

class TopologyRefreshTask implements Runnable {
@Override
public void run() {
logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet());
renewClusterSlots(null);
logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet());
}
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, null, startNodes);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes) {
this(clientConfig, poolConfig, startNodes, null);
}

public JedisClusterInfoCache(final JedisClientConfig clientConfig,
final GenericObjectPoolConfig<Connection> poolConfig, final Set<HostAndPort> startNodes,
final Duration topologyRefreshPeriod) {
this.poolConfig = poolConfig;
this.clientConfig = clientConfig;
this.startNodes = startNodes;
if (topologyRefreshPeriod != null) {
logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes);
topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor();
topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(),
topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS);
}
}

/**
* Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS
* @param slotsInfo the cluster topology
* @return if slots is ok, return true, elese return false.
*/
private boolean checkClusterSlotSequence(List<Object> slotsInfo) {
List<Integer> slots = new ArrayList<>();
for (Object slotInfoObj : slotsInfo) {
List<Object> slotInfo = (List<Object>)slotInfoObj;
slots.addAll(getAssignedSlotArray(slotInfo));
}
Collections.sort(slots);
if (slots.size() != Protocol.CLUSTER_HASHSLOTS) {
return false;
}
for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) {
if (i != slots.get(i)) {
return false;
}
}
return true;
}

public void discoverClusterNodesAndSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -133,8 +197,13 @@ public void renewClusterSlots(Connection jedis) {

private void discoverClusterSlots(Connection jedis) {
List<Object> slotsInfo = executeClusterSlots(jedis);
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) {
if (slotsInfo.isEmpty()) {
throw new JedisClusterOperationException("Cluster slots list is empty.");
}
if (!checkClusterSlotSequence(slotsInfo)) {
throw new JedisClusterOperationException("Cluster slots have holes.");
}
}
w.lock();
try {
Expand Down Expand Up @@ -308,6 +377,14 @@ public void reset() {
}
}

public void close() {
reset();
if (topologyRefreshExecutor != null) {
logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes);
topologyRefreshExecutor.shutdownNow();
}
}

public static String getNodeKey(HostAndPort hnp) {
//return hnp.getHost() + ":" + hnp.getPort();
return hnp.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package redis.clients.jedis.providers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -17,9 +18,9 @@
import redis.clients.jedis.exceptions.JedisClusterOperationException;
import redis.clients.jedis.exceptions.JedisException;

public class ClusterConnectionProvider implements ConnectionProvider {
import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;

private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError";
public class ClusterConnectionProvider implements ConnectionProvider {

protected final JedisClusterInfoCache cache;

Expand All @@ -34,6 +35,12 @@ public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfi
initializeSlotsCache(clusterNodes, clientConfig);
}

public ClusterConnectionProvider(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod) {
this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod);
initializeSlotsCache(clusterNodes, clientConfig);
}

private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig clientConfig) {
if (startNodes.isEmpty()) {
throw new JedisClusterOperationException("No nodes to initialize cluster slots cache.");
Expand Down Expand Up @@ -66,7 +73,7 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, JedisClientConfig

@Override
public void close() {
cache.reset();
cache.close();
}

public void renewSlotCache() {
Expand Down
59 changes: 59 additions & 0 deletions src/test/java/redis/clients/jedis/JedisClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,65 @@ public void clusterRefreshNodes() throws Exception {
}
}

@Test(timeout = 30_000)
public void clusterPeriodTopologyRefreshTest() throws Exception {
Set<HostAndPort> jedisClusterNode = new HashSet<>();
jedisClusterNode.add(nodeInfo1);
jedisClusterNode.add(nodeInfo2);
jedisClusterNode.add(nodeInfo3);

// we set topologyRefreshPeriod is 1s
Duration topologyRefreshPeriod = Duration.ofSeconds(1);
try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG,
topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) {
assertEquals(3, cluster.getClusterNodes().size());
cleanUp(); // cleanup and add node4

// at first, join node4 to cluster
node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort());
node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort());
// split available slots across the three nodes
int slotsPerNode = CLUSTER_HASHSLOTS / 4;
int[] node1Slots = new int[slotsPerNode];
int[] node2Slots = new int[slotsPerNode];
int[] node3Slots = new int[slotsPerNode];
int[] node4Slots = new int[slotsPerNode];
for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) {
if (i < slotsPerNode) {
node1Slots[slot1++] = i;
} else if (i >= slotsPerNode && i < slotsPerNode*2) {
node2Slots[slot2++] = i;
} else if (i >= slotsPerNode*2 && i < slotsPerNode*3) {
node3Slots[slot3++] = i;
} else {
node4Slots[slot4++] = i;
}
}

node1.clusterAddSlots(node1Slots);
node2.clusterAddSlots(node2Slots);
node3.clusterAddSlots(node3Slots);
node4.clusterAddSlots(node4Slots);
JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4);

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);

assertEquals(4, cluster.getClusterNodes().size());
String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort();
assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4));

// make 4 nodes to 3 nodes
cleanUp();
setUp();

// Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3)
Thread.sleep(topologyRefreshPeriod.toMillis() * 3);
assertEquals(3, cluster.getClusterNodes().size());
}
}

private static String getNodeServingSlotRange(String infoOutput) {
// f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0
// 1394372400827 0 connected 5461-10922
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/redis/clients/jedis/JedisClusterTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ protected void cleanUp() {
node2.flushDB();
node3.flushDB();
node4.flushDB();
node1.clusterReset(ClusterResetType.SOFT);
node2.clusterReset(ClusterResetType.SOFT);
node3.clusterReset(ClusterResetType.SOFT);
node4.clusterReset(ClusterResetType.SOFT);
node1.clusterReset(ClusterResetType.HARD);
node2.clusterReset(ClusterResetType.HARD);
node3.clusterReset(ClusterResetType.HARD);
node4.clusterReset(ClusterResetType.HARD);
}

@After
Expand Down
Loading