diff --git a/Makefile b/Makefile index 2987ad80dd..6e63f1b6ce 100644 --- a/Makefile +++ b/Makefile @@ -209,7 +209,7 @@ daemonize yes protected-mode no requirepass cluster port 7379 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node1.pid logfile /tmp/redis_cluster_node1.log save "" @@ -223,7 +223,7 @@ daemonize yes protected-mode no requirepass cluster port 7380 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node2.pid logfile /tmp/redis_cluster_node2.log save "" @@ -237,7 +237,7 @@ daemonize yes protected-mode no requirepass cluster port 7381 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node3.pid logfile /tmp/redis_cluster_node3.log save "" @@ -251,7 +251,7 @@ daemonize yes protected-mode no requirepass cluster port 7382 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node4.pid logfile /tmp/redis_cluster_node4.log save "" @@ -265,7 +265,7 @@ daemonize yes protected-mode no requirepass cluster port 7383 -cluster-node-timeout 5000 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node5.pid logfile /tmp/redis_cluster_node5.log save "" diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index e70db767f9..0c850c8ed8 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; @@ -23,6 +24,13 @@ public ClusterPipeline(Set clusterNodes, JedisClientConfig clientCo this.closeable = this.provider; } + public ClusterPipeline(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), + createClusterCommandObjects(clientConfig.getRedisProtocol())); + this.closeable = this.provider; + } + public ClusterPipeline(ClusterConnectionProvider provider) { this(provider, new ClusterCommandObjects()); } diff --git a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java index f9c7cd2228..a2d963e221 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java @@ -70,7 +70,9 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately - socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout); + // Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor. + // For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail. + socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout); return socket; } catch (Exception e) { jce.addSuppressed(e); diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index e9bb606191..4a73df3d07 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -12,6 +12,8 @@ public class JedisCluster extends UnifiedJedis { + public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError"; + /** * Default timeout in milliseconds. */ @@ -195,6 +197,13 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration); } + public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod, int maxAttempts, + Duration maxTotalRetriesDuration) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), + maxAttempts, maxTotalRetriesDuration); + } + public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { super(provider, maxAttempts, maxTotalRetriesDuration); diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index a4cc2d1d63..bea4982fd4 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -10,17 +11,26 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.exceptions.JedisClusterOperationException; import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.SafeEncoder; +import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY; + public class JedisClusterInfoCache { + private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); + private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; private final HostAndPort[] slotNodes = new HostAndPort[Protocol.CLUSTER_HASHSLOTS]; @@ -36,21 +46,75 @@ public class JedisClusterInfoCache { private static final int MASTER_NODE_INDEX = 2; + /** + * The single thread executor for the topology refresh task. + */ + private ScheduledExecutorService topologyRefreshExecutor = null; + + class TopologyRefreshTask implements Runnable { + @Override + public void run() { + logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet()); + renewClusterSlots(null); + logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet()); + } + } + public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) { this(clientConfig, null, startNodes); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes) { + this(clientConfig, poolConfig, startNodes, null); + } + + public JedisClusterInfoCache(final JedisClientConfig clientConfig, + final GenericObjectPoolConfig poolConfig, final Set startNodes, + final Duration topologyRefreshPeriod) { this.poolConfig = poolConfig; this.clientConfig = clientConfig; this.startNodes = startNodes; + if (topologyRefreshPeriod != null) { + logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes); + topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor(); + topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(), + topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS); + } + } + + /** + * Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS + * @param slotsInfo the cluster topology + * @return if slots is ok, return true, elese return false. + */ + private boolean checkClusterSlotSequence(List slotsInfo) { + List slots = new ArrayList<>(); + for (Object slotInfoObj : slotsInfo) { + List slotInfo = (List)slotInfoObj; + slots.addAll(getAssignedSlotArray(slotInfo)); + } + Collections.sort(slots); + if (slots.size() != Protocol.CLUSTER_HASHSLOTS) { + return false; + } + for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) { + if (i != slots.get(i)) { + return false; + } + } + return true; } public void discoverClusterNodesAndSlots(Connection jedis) { List slotsInfo = executeClusterSlots(jedis); - if (slotsInfo.isEmpty()) { - throw new JedisClusterOperationException("Cluster slots list is empty."); + if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) { + if (slotsInfo.isEmpty()) { + throw new JedisClusterOperationException("Cluster slots list is empty."); + } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } } w.lock(); try { @@ -133,8 +197,13 @@ public void renewClusterSlots(Connection jedis) { private void discoverClusterSlots(Connection jedis) { List slotsInfo = executeClusterSlots(jedis); - if (slotsInfo.isEmpty()) { - throw new JedisClusterOperationException("Cluster slots list is empty."); + if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) { + if (slotsInfo.isEmpty()) { + throw new JedisClusterOperationException("Cluster slots list is empty."); + } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } } w.lock(); try { @@ -308,6 +377,14 @@ public void reset() { } } + public void close() { + reset(); + if (topologyRefreshExecutor != null) { + logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes); + topologyRefreshExecutor.shutdownNow(); + } + } + public static String getNodeKey(HostAndPort hnp) { //return hnp.getHost() + ":" + hnp.getPort(); return hnp.toString(); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 4c47f2094b..c21640713d 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -1,5 +1,6 @@ package redis.clients.jedis.providers; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -17,9 +18,9 @@ import redis.clients.jedis.exceptions.JedisClusterOperationException; import redis.clients.jedis.exceptions.JedisException; -public class ClusterConnectionProvider implements ConnectionProvider { +import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY; - private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError"; +public class ClusterConnectionProvider implements ConnectionProvider { protected final JedisClusterInfoCache cache; @@ -34,6 +35,12 @@ public ClusterConnectionProvider(Set clusterNodes, JedisClientConfi initializeSlotsCache(clusterNodes, clientConfig); } + public ClusterConnectionProvider(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod) { + this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod); + initializeSlotsCache(clusterNodes, clientConfig); + } + private void initializeSlotsCache(Set startNodes, JedisClientConfig clientConfig) { if (startNodes.isEmpty()) { throw new JedisClusterOperationException("No nodes to initialize cluster slots cache."); @@ -66,7 +73,7 @@ private void initializeSlotsCache(Set startNodes, JedisClientConfig @Override public void close() { - cache.reset(); + cache.close(); } public void renewSlotCache() { diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index b93fa2409e..8297eb90c6 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -741,6 +741,65 @@ public void clusterRefreshNodes() throws Exception { } } + @Test(timeout = 30_000) + public void clusterPeriodTopologyRefreshTest() throws Exception { + Set jedisClusterNode = new HashSet<>(); + jedisClusterNode.add(nodeInfo1); + jedisClusterNode.add(nodeInfo2); + jedisClusterNode.add(nodeInfo3); + + // we set topologyRefreshPeriod is 1s + Duration topologyRefreshPeriod = Duration.ofSeconds(1); + try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG, + topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) { + assertEquals(3, cluster.getClusterNodes().size()); + cleanUp(); // cleanup and add node4 + + // at first, join node4 to cluster + node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort()); + // split available slots across the three nodes + int slotsPerNode = CLUSTER_HASHSLOTS / 4; + int[] node1Slots = new int[slotsPerNode]; + int[] node2Slots = new int[slotsPerNode]; + int[] node3Slots = new int[slotsPerNode]; + int[] node4Slots = new int[slotsPerNode]; + for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) { + if (i < slotsPerNode) { + node1Slots[slot1++] = i; + } else if (i >= slotsPerNode && i < slotsPerNode*2) { + node2Slots[slot2++] = i; + } else if (i >= slotsPerNode*2 && i < slotsPerNode*3) { + node3Slots[slot3++] = i; + } else { + node4Slots[slot4++] = i; + } + } + + node1.clusterAddSlots(node1Slots); + node2.clusterAddSlots(node2Slots); + node3.clusterAddSlots(node3Slots); + node4.clusterAddSlots(node4Slots); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + + assertEquals(4, cluster.getClusterNodes().size()); + String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort(); + assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4)); + + // make 4 nodes to 3 nodes + cleanUp(); + setUp(); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + assertEquals(3, cluster.getClusterNodes().size()); + } + } + private static String getNodeServingSlotRange(String infoOutput) { // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 // 1394372400827 0 connected 5461-10922 diff --git a/src/test/java/redis/clients/jedis/JedisClusterTestBase.java b/src/test/java/redis/clients/jedis/JedisClusterTestBase.java index 6094575a54..0746c2d37c 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTestBase.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTestBase.java @@ -77,10 +77,10 @@ protected void cleanUp() { node2.flushDB(); node3.flushDB(); node4.flushDB(); - node1.clusterReset(ClusterResetType.SOFT); - node2.clusterReset(ClusterResetType.SOFT); - node3.clusterReset(ClusterResetType.SOFT); - node4.clusterReset(ClusterResetType.SOFT); + node1.clusterReset(ClusterResetType.HARD); + node2.clusterReset(ClusterResetType.HARD); + node3.clusterReset(ClusterResetType.HARD); + node4.clusterReset(ClusterResetType.HARD); } @After