From 745a0968cc407b92df4352993b1ab80724232ab3 Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Wed, 25 Oct 2023 19:10:44 +0800 Subject: [PATCH 1/8] Introducing periodic topology mechanism for JedisCluster solves #3595 --- Makefile | 10 +-- .../redis/clients/jedis/ClusterPipeline.java | 9 +++ .../jedis/DefaultJedisSocketFactory.java | 2 +- .../redis/clients/jedis/JedisCluster.java | 7 ++ .../clients/jedis/JedisClusterInfoCache.java | 80 ++++++++++++++++++- .../redis/clients/jedis/UnifiedJedis.java | 9 +++ .../providers/ClusterConnectionProvider.java | 10 ++- .../redis/clients/jedis/JedisClusterTest.java | 60 ++++++++++++++ .../clients/jedis/JedisClusterTestBase.java | 8 +- 9 files changed, 183 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 2987ad80dd..6e63f1b6ce 100644 --- a/Makefile +++ b/Makefile @@ -209,7 +209,7 @@ daemonize yes protected-mode no requirepass cluster port 7379 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node1.pid logfile /tmp/redis_cluster_node1.log save "" @@ -223,7 +223,7 @@ daemonize yes protected-mode no requirepass cluster port 7380 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node2.pid logfile /tmp/redis_cluster_node2.log save "" @@ -237,7 +237,7 @@ daemonize yes protected-mode no requirepass cluster port 7381 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node3.pid logfile /tmp/redis_cluster_node3.log save "" @@ -251,7 +251,7 @@ daemonize yes protected-mode no requirepass cluster port 7382 -cluster-node-timeout 50 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node4.pid logfile /tmp/redis_cluster_node4.log save "" @@ -265,7 +265,7 @@ daemonize yes protected-mode no requirepass cluster port 7383 -cluster-node-timeout 5000 +cluster-node-timeout 15000 pidfile /tmp/redis_cluster_node5.pid logfile /tmp/redis_cluster_node5.log save "" diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index e70db767f9..e7396b7e46 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.Set; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.providers.ClusterConnectionProvider; @@ -23,6 +24,14 @@ public ClusterPipeline(Set clusterNodes, JedisClientConfig clientCo this.closeable = this.provider; } + public ClusterPipeline(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, + topologyRefreshEnabled, topologyRefreshPeriod), + createClusterCommandObjects(clientConfig.getRedisProtocol())); + this.closeable = this.provider; + } + public ClusterPipeline(ClusterConnectionProvider provider) { this(provider, new ClusterCommandObjects()); } diff --git a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java index f9c7cd2228..6acf24c13c 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java @@ -70,7 +70,7 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately - socket.connect(new InetSocketAddress(host.getHostAddress(), hostAndPort.getPort()), connectionTimeout); + socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout); return socket; } catch (Exception e) { jce.addSuppressed(e); diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index e9bb606191..8f79ccb810 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -195,6 +195,13 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi super(clusterNodes, clientConfig, maxAttempts, maxTotalRetriesDuration); } + public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, + boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration, topologyRefreshEnabled, + topologyRefreshPeriod); + } + public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { super(provider, maxAttempts, maxTotalRetriesDuration); diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index a4cc2d1d63..7c1f9a3eea 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -1,5 +1,6 @@ package redis.clients.jedis; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -10,16 +11,24 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.exceptions.JedisClusterOperationException; import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.SafeEncoder; public class JedisClusterInfoCache { + private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); + private static final boolean DEFAULT_TOPOLOGY_REFRESH_ENABLED = false; + private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = Duration.ofSeconds(60); private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; @@ -36,15 +45,70 @@ public class JedisClusterInfoCache { private static final int MASTER_NODE_INDEX = 2; + private final boolean topologyRefreshEnabled; + private final Duration topologyRefreshPeriod; + + /** + * The single thread executor for the topology refresh task. + */ + private ScheduledExecutorService topologyRefreshExecutor = null; + + class TopologyRefreshTask implements Runnable { + @Override + public void run() { + logger.debug("Cluster topology refresh run, old nodes: {}", nodes.keySet()); + renewClusterSlots(null); + logger.debug("Cluster topology refresh run, new nodes: {}", nodes.keySet()); + } + } + public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) { - this(clientConfig, null, startNodes); + this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes) { + this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + } + + public JedisClusterInfoCache(final JedisClientConfig clientConfig, + final GenericObjectPoolConfig poolConfig, final Set startNodes, + final boolean topologyRefreshEnabled, final Duration topologyRefreshPeriod) { this.poolConfig = poolConfig; this.clientConfig = clientConfig; this.startNodes = startNodes; + this.topologyRefreshEnabled = topologyRefreshEnabled; + this.topologyRefreshPeriod = topologyRefreshPeriod; + if (topologyRefreshEnabled) { + logger.info("Cluster topology refresh start, period: {}, startNodes: {}", + topologyRefreshPeriod.toString(), startNodes); + topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor(); + topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(), + topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS); + } + } + + /** + * Check whether the number and order of slots in the cluster topology are equal to CLUSTER_HASHSLOTS + * @param slotsInfo the cluster topology + * @return if slots is ok, return true, elese return false. + */ + private boolean checkClusterSlotSequence(List slotsInfo) { + List slots = new ArrayList<>(); + for (Object slotInfoObj : slotsInfo) { + List slotInfo = (List)slotInfoObj; + slots.addAll(getAssignedSlotArray(slotInfo)); + } + Collections.sort(slots); + if (slots.size() != Protocol.CLUSTER_HASHSLOTS) { + return false; + } + for (int i = 0; i < Protocol.CLUSTER_HASHSLOTS; ++i) { + if (i != slots.get(i)) { + return false; + } + } + return true; } public void discoverClusterNodesAndSlots(Connection jedis) { @@ -52,6 +116,9 @@ public void discoverClusterNodesAndSlots(Connection jedis) { if (slotsInfo.isEmpty()) { throw new JedisClusterOperationException("Cluster slots list is empty."); } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } w.lock(); try { reset(); @@ -136,6 +203,9 @@ private void discoverClusterSlots(Connection jedis) { if (slotsInfo.isEmpty()) { throw new JedisClusterOperationException("Cluster slots list is empty."); } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } w.lock(); try { Arrays.fill(slots, null); @@ -308,6 +378,14 @@ public void reset() { } } + public void close() { + reset(); + if (topologyRefreshEnabled && topologyRefreshExecutor != null) { + logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes); + topologyRefreshExecutor.shutdownNow(); + } + } + public static String getNodeKey(HostAndPort hnp) { //return hnp.getHost() + ":" + hnp.getPort(); return hnp.toString(); diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index 2184534ca6..f32a6d9973 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -164,6 +164,15 @@ public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig client if (proto != null) commandObjects.setProtocol(proto); } + public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, + boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig, topologyRefreshEnabled, + topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration); + RedisProtocol proto = clientConfig.getRedisProtocol(); + if (proto != null) commandObjects.setProtocol(proto); + } + public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { this.provider = provider; this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration); diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 4c47f2094b..6550a9410e 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -1,5 +1,6 @@ package redis.clients.jedis.providers; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -34,6 +35,13 @@ public ClusterConnectionProvider(Set clusterNodes, JedisClientConfi initializeSlotsCache(clusterNodes, clientConfig); } + public ClusterConnectionProvider(Set clusterNodes, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { + this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshEnabled, + topologyRefreshPeriod); + initializeSlotsCache(clusterNodes, clientConfig); + } + private void initializeSlotsCache(Set startNodes, JedisClientConfig clientConfig) { if (startNodes.isEmpty()) { throw new JedisClusterOperationException("No nodes to initialize cluster slots cache."); @@ -66,7 +74,7 @@ private void initializeSlotsCache(Set startNodes, JedisClientConfig @Override public void close() { - cache.reset(); + cache.close(); } public void renewSlotCache() { diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index b93fa2409e..b6ec669882 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -741,6 +741,66 @@ public void clusterRefreshNodes() throws Exception { } } + @Test + public void clusterPeriodTopologyRefreshTest() throws Exception { + Set jedisClusterNode = new HashSet<>(); + jedisClusterNode.add(nodeInfo1); + jedisClusterNode.add(nodeInfo2); + jedisClusterNode.add(nodeInfo3); + + // we set topologyRefreshPeriod is 5s + boolean topologyRefreshEnabled = true; + Duration topologyRefreshPeriod = Duration.ofSeconds(3); + try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG, + DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000), topologyRefreshEnabled, topologyRefreshPeriod)) { + assertEquals(3, cluster.getClusterNodes().size()); + cleanUp(); // cleanup and add node4 + + // at first, join node4 to cluster + node1.clusterMeet(LOCAL_IP, nodeInfo2.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo3.getPort()); + node1.clusterMeet(LOCAL_IP, nodeInfo4.getPort()); + // split available slots across the three nodes + int slotsPerNode = CLUSTER_HASHSLOTS / 4; + int[] node1Slots = new int[slotsPerNode]; + int[] node2Slots = new int[slotsPerNode]; + int[] node3Slots = new int[slotsPerNode]; + int[] node4Slots = new int[slotsPerNode]; + for (int i = 0, slot1 = 0, slot2 = 0, slot3 = 0, slot4 = 0; i < CLUSTER_HASHSLOTS; i++) { + if (i < slotsPerNode) { + node1Slots[slot1++] = i; + } else if (i >= slotsPerNode && i < slotsPerNode*2) { + node2Slots[slot2++] = i; + } else if (i >= slotsPerNode*2 && i < slotsPerNode*3) { + node3Slots[slot3++] = i; + } else { + node4Slots[slot4++] = i; + } + } + + node1.clusterAddSlots(node1Slots); + node2.clusterAddSlots(node2Slots); + node3.clusterAddSlots(node3Slots); + node4.clusterAddSlots(node4Slots); + JedisClusterTestUtil.waitForClusterReady(node1, node2, node3, node4); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (3 -> 4) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + + assertEquals(4, cluster.getClusterNodes().size()); + String nodeKey4 = LOCAL_IP + ":" + nodeInfo4.getPort(); + assertTrue(cluster.getClusterNodes().keySet().contains(nodeKey4)); + + // make 4 nodes to 3 nodes + cleanUp(); + setUp(); + + // Now we just wait topologyRefreshPeriod * 3 (executor will delay) for cluster topology refresh (4 -> 3) + Thread.sleep(topologyRefreshPeriod.toMillis() * 3); + assertEquals(3, cluster.getClusterNodes().size()); + } + } + private static String getNodeServingSlotRange(String infoOutput) { // f4f3dc4befda352a4e0beccf29f5e8828438705d 127.0.0.1:7380 master - 0 // 1394372400827 0 connected 5461-10922 diff --git a/src/test/java/redis/clients/jedis/JedisClusterTestBase.java b/src/test/java/redis/clients/jedis/JedisClusterTestBase.java index 6094575a54..0746c2d37c 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTestBase.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTestBase.java @@ -77,10 +77,10 @@ protected void cleanUp() { node2.flushDB(); node3.flushDB(); node4.flushDB(); - node1.clusterReset(ClusterResetType.SOFT); - node2.clusterReset(ClusterResetType.SOFT); - node3.clusterReset(ClusterResetType.SOFT); - node4.clusterReset(ClusterResetType.SOFT); + node1.clusterReset(ClusterResetType.HARD); + node2.clusterReset(ClusterResetType.HARD); + node3.clusterReset(ClusterResetType.HARD); + node4.clusterReset(ClusterResetType.HARD); } @After From f048a02ff394e2ac8a8079bb5571c61f4f65c07a Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 16:37:02 +0800 Subject: [PATCH 2/8] Apply suggestions from sazzad16 Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> --- src/main/java/redis/clients/jedis/JedisCluster.java | 8 ++++---- src/main/java/redis/clients/jedis/UnifiedJedis.java | 9 --------- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 8f79ccb810..0764351513 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -196,10 +196,10 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, - GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, - boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { - super(clusterNodes, clientConfig, poolConfig, maxAttempts, maxTotalRetriesDuration, topologyRefreshEnabled, - topologyRefreshPeriod); + GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, + Duration topologyRefreshPeriod, int maxAttempts, Duration maxTotalRetriesDuration) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshEnabled, + topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration); } public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, diff --git a/src/main/java/redis/clients/jedis/UnifiedJedis.java b/src/main/java/redis/clients/jedis/UnifiedJedis.java index f32a6d9973..2184534ca6 100644 --- a/src/main/java/redis/clients/jedis/UnifiedJedis.java +++ b/src/main/java/redis/clients/jedis/UnifiedJedis.java @@ -164,15 +164,6 @@ public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig client if (proto != null) commandObjects.setProtocol(proto); } - public UnifiedJedis(Set jedisClusterNodes, JedisClientConfig clientConfig, - GenericObjectPoolConfig poolConfig, int maxAttempts, Duration maxTotalRetriesDuration, - boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { - this(new ClusterConnectionProvider(jedisClusterNodes, clientConfig, poolConfig, topologyRefreshEnabled, - topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration); - RedisProtocol proto = clientConfig.getRedisProtocol(); - if (proto != null) commandObjects.setProtocol(proto); - } - public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { this.provider = provider; this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration); From 6ef65e387d15c8c4ee85ff3355e9f52c808b7d73 Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 16:49:16 +0800 Subject: [PATCH 3/8] Remove topologyRefreshEnabled --- .../redis/clients/jedis/ClusterPipeline.java | 5 ++--- .../java/redis/clients/jedis/JedisCluster.java | 8 ++++---- .../clients/jedis/JedisClusterInfoCache.java | 18 +++++++----------- .../providers/ClusterConnectionProvider.java | 5 ++--- .../redis/clients/jedis/JedisClusterTest.java | 3 +-- 5 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/main/java/redis/clients/jedis/ClusterPipeline.java b/src/main/java/redis/clients/jedis/ClusterPipeline.java index e7396b7e46..0c850c8ed8 100644 --- a/src/main/java/redis/clients/jedis/ClusterPipeline.java +++ b/src/main/java/redis/clients/jedis/ClusterPipeline.java @@ -25,9 +25,8 @@ public ClusterPipeline(Set clusterNodes, JedisClientConfig clientCo } public ClusterPipeline(Set clusterNodes, JedisClientConfig clientConfig, - GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { - this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, - topologyRefreshEnabled, topologyRefreshPeriod), + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), createClusterCommandObjects(clientConfig.getRedisProtocol())); this.closeable = this.provider; } diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 0764351513..736dfa2a5f 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -196,10 +196,10 @@ public JedisCluster(Set clusterNodes, JedisClientConfig clientConfi } public JedisCluster(Set clusterNodes, JedisClientConfig clientConfig, - GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, - Duration topologyRefreshPeriod, int maxAttempts, Duration maxTotalRetriesDuration) { - this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshEnabled, - topologyRefreshPeriod), maxAttempts, maxTotalRetriesDuration); + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod, int maxAttempts, + Duration maxTotalRetriesDuration) { + this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod), + maxAttempts, maxTotalRetriesDuration); } public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index 7c1f9a3eea..6deeb9de43 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -27,8 +27,7 @@ public class JedisClusterInfoCache { private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); - private static final boolean DEFAULT_TOPOLOGY_REFRESH_ENABLED = false; - private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = Duration.ofSeconds(60); + private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = null; private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; @@ -45,7 +44,6 @@ public class JedisClusterInfoCache { private static final int MASTER_NODE_INDEX = 2; - private final boolean topologyRefreshEnabled; private final Duration topologyRefreshPeriod; /** @@ -63,25 +61,23 @@ public void run() { } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) { - this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_PERIOD); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes) { - this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_ENABLED, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_PERIOD); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes, - final boolean topologyRefreshEnabled, final Duration topologyRefreshPeriod) { + final Duration topologyRefreshPeriod) { this.poolConfig = poolConfig; this.clientConfig = clientConfig; this.startNodes = startNodes; - this.topologyRefreshEnabled = topologyRefreshEnabled; this.topologyRefreshPeriod = topologyRefreshPeriod; - if (topologyRefreshEnabled) { - logger.info("Cluster topology refresh start, period: {}, startNodes: {}", - topologyRefreshPeriod.toString(), startNodes); + if (topologyRefreshPeriod != null) { + logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes); topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor(); topologyRefreshExecutor.scheduleWithFixedDelay(new TopologyRefreshTask(), topologyRefreshPeriod.toMillis(), topologyRefreshPeriod.toMillis(), TimeUnit.MILLISECONDS); @@ -380,7 +376,7 @@ public void reset() { public void close() { reset(); - if (topologyRefreshEnabled && topologyRefreshExecutor != null) { + if (topologyRefreshPeriod != null && topologyRefreshExecutor != null) { logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes); topologyRefreshExecutor.shutdownNow(); } diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index 6550a9410e..c959de26c6 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -36,9 +36,8 @@ public ClusterConnectionProvider(Set clusterNodes, JedisClientConfi } public ClusterConnectionProvider(Set clusterNodes, JedisClientConfig clientConfig, - GenericObjectPoolConfig poolConfig, boolean topologyRefreshEnabled, Duration topologyRefreshPeriod) { - this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshEnabled, - topologyRefreshPeriod); + GenericObjectPoolConfig poolConfig, Duration topologyRefreshPeriod) { + this.cache = new JedisClusterInfoCache(clientConfig, poolConfig, clusterNodes, topologyRefreshPeriod); initializeSlotsCache(clusterNodes, clientConfig); } diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index b6ec669882..e5752fad43 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -749,10 +749,9 @@ public void clusterPeriodTopologyRefreshTest() throws Exception { jedisClusterNode.add(nodeInfo3); // we set topologyRefreshPeriod is 5s - boolean topologyRefreshEnabled = true; Duration topologyRefreshPeriod = Duration.ofSeconds(3); try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG, - DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000), topologyRefreshEnabled, topologyRefreshPeriod)) { + topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000))) { assertEquals(3, cluster.getClusterNodes().size()); cleanUp(); // cleanup and add node4 From 439a2a71e1631ea3e8ed93b6672f5cbe768aee08 Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 19:03:52 +0800 Subject: [PATCH 4/8] Apply suggestions from sazzad16 Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> --- .../redis/clients/jedis/JedisClusterInfoCache.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index 6deeb9de43..b82d0a99e8 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -26,8 +26,8 @@ import redis.clients.jedis.util.SafeEncoder; public class JedisClusterInfoCache { + private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); - private static final Duration DEFAULT_TOPOLOGY_REFRESH_PERIOD = null; private final Map nodes = new HashMap<>(); private final ConnectionPool[] slots = new ConnectionPool[Protocol.CLUSTER_HASHSLOTS]; @@ -49,7 +49,7 @@ public class JedisClusterInfoCache { /** * The single thread executor for the topology refresh task. */ - private ScheduledExecutorService topologyRefreshExecutor = null; + private final ScheduledExecutorService topologyRefreshExecutor; class TopologyRefreshTask implements Runnable { @Override @@ -61,12 +61,12 @@ public void run() { } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final Set startNodes) { - this(clientConfig, null, startNodes, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + this(clientConfig, null, startNodes); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, final GenericObjectPoolConfig poolConfig, final Set startNodes) { - this(clientConfig, poolConfig, startNodes, DEFAULT_TOPOLOGY_REFRESH_PERIOD); + this(clientConfig, poolConfig, startNodes, null); } public JedisClusterInfoCache(final JedisClientConfig clientConfig, @@ -376,7 +376,7 @@ public void reset() { public void close() { reset(); - if (topologyRefreshPeriod != null && topologyRefreshExecutor != null) { + if (topologyRefreshExecutor != null) { logger.info("Cluster topology refresh shutdown, startNodes: {}", startNodes); topologyRefreshExecutor.shutdownNow(); } From a835964fcdc117f4f7c5f6b83496b7b3bcf079bf Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 19:07:04 +0800 Subject: [PATCH 5/8] remove save topologyRefreshPeriod --- src/main/java/redis/clients/jedis/JedisClusterInfoCache.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index b82d0a99e8..2602de4d87 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -44,12 +44,10 @@ public class JedisClusterInfoCache { private static final int MASTER_NODE_INDEX = 2; - private final Duration topologyRefreshPeriod; - /** * The single thread executor for the topology refresh task. */ - private final ScheduledExecutorService topologyRefreshExecutor; + private ScheduledExecutorService topologyRefreshExecutor = null; class TopologyRefreshTask implements Runnable { @Override @@ -75,7 +73,6 @@ public JedisClusterInfoCache(final JedisClientConfig clientConfig, this.poolConfig = poolConfig; this.clientConfig = clientConfig; this.startNodes = startNodes; - this.topologyRefreshPeriod = topologyRefreshPeriod; if (topologyRefreshPeriod != null) { logger.info("Cluster topology refresh start, period: {}, startNodes: {}", topologyRefreshPeriod, startNodes); topologyRefreshExecutor = Executors.newSingleThreadScheduledExecutor(); From dee72a2ae7d02eb3962a611c7e9fb3a281135bfa Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 19:16:01 +0800 Subject: [PATCH 6/8] Move INIT_NO_ERROR_PROPERTY to JedisCluster --- .../redis/clients/jedis/JedisCluster.java | 2 ++ .../clients/jedis/JedisClusterInfoCache.java | 26 ++++++++++++------- .../providers/ClusterConnectionProvider.java | 4 +-- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisCluster.java b/src/main/java/redis/clients/jedis/JedisCluster.java index 736dfa2a5f..4a73df3d07 100644 --- a/src/main/java/redis/clients/jedis/JedisCluster.java +++ b/src/main/java/redis/clients/jedis/JedisCluster.java @@ -12,6 +12,8 @@ public class JedisCluster extends UnifiedJedis { + public static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError"; + /** * Default timeout in milliseconds. */ diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index 2602de4d87..bea4982fd4 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -25,6 +25,8 @@ import redis.clients.jedis.exceptions.JedisException; import redis.clients.jedis.util.SafeEncoder; +import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY; + public class JedisClusterInfoCache { private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class); @@ -106,11 +108,13 @@ private boolean checkClusterSlotSequence(List slotsInfo) { public void discoverClusterNodesAndSlots(Connection jedis) { List slotsInfo = executeClusterSlots(jedis); - if (slotsInfo.isEmpty()) { - throw new JedisClusterOperationException("Cluster slots list is empty."); - } - if (!checkClusterSlotSequence(slotsInfo)) { - throw new JedisClusterOperationException("Cluster slots have holes."); + if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) { + if (slotsInfo.isEmpty()) { + throw new JedisClusterOperationException("Cluster slots list is empty."); + } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } } w.lock(); try { @@ -193,11 +197,13 @@ public void renewClusterSlots(Connection jedis) { private void discoverClusterSlots(Connection jedis) { List slotsInfo = executeClusterSlots(jedis); - if (slotsInfo.isEmpty()) { - throw new JedisClusterOperationException("Cluster slots list is empty."); - } - if (!checkClusterSlotSequence(slotsInfo)) { - throw new JedisClusterOperationException("Cluster slots have holes."); + if (System.getProperty(INIT_NO_ERROR_PROPERTY) == null) { + if (slotsInfo.isEmpty()) { + throw new JedisClusterOperationException("Cluster slots list is empty."); + } + if (!checkClusterSlotSequence(slotsInfo)) { + throw new JedisClusterOperationException("Cluster slots have holes."); + } } w.lock(); try { diff --git a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java index c959de26c6..c21640713d 100644 --- a/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/ClusterConnectionProvider.java @@ -18,9 +18,9 @@ import redis.clients.jedis.exceptions.JedisClusterOperationException; import redis.clients.jedis.exceptions.JedisException; -public class ClusterConnectionProvider implements ConnectionProvider { +import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY; - private static final String INIT_NO_ERROR_PROPERTY = "jedis.cluster.initNoError"; +public class ClusterConnectionProvider implements ConnectionProvider { protected final JedisClusterInfoCache cache; From ac559405e3d0125e40a863c35fc96aab815db42d Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Tue, 31 Oct 2023 23:09:18 +0800 Subject: [PATCH 7/8] add timeout for clusterPeriodTopologyRefreshTest --- src/test/java/redis/clients/jedis/JedisClusterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/redis/clients/jedis/JedisClusterTest.java b/src/test/java/redis/clients/jedis/JedisClusterTest.java index e5752fad43..8297eb90c6 100644 --- a/src/test/java/redis/clients/jedis/JedisClusterTest.java +++ b/src/test/java/redis/clients/jedis/JedisClusterTest.java @@ -741,17 +741,17 @@ public void clusterRefreshNodes() throws Exception { } } - @Test + @Test(timeout = 30_000) public void clusterPeriodTopologyRefreshTest() throws Exception { Set jedisClusterNode = new HashSet<>(); jedisClusterNode.add(nodeInfo1); jedisClusterNode.add(nodeInfo2); jedisClusterNode.add(nodeInfo3); - // we set topologyRefreshPeriod is 5s - Duration topologyRefreshPeriod = Duration.ofSeconds(3); + // we set topologyRefreshPeriod is 1s + Duration topologyRefreshPeriod = Duration.ofSeconds(1); try (JedisCluster cluster = new JedisCluster(jedisClusterNode, DEFAULT_CLIENT_CONFIG, DEFAULT_POOL_CONFIG, - topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(1000))) { + topologyRefreshPeriod, DEFAULT_REDIRECTIONS, Duration.ofSeconds(10))) { assertEquals(3, cluster.getClusterNodes().size()); cleanUp(); // cleanup and add node4 From 9f9f677f5ff43ff3d1a03076bb12a7775faa733f Mon Sep 17 00:00:00 2001 From: "bodong.ybd" Date: Thu, 2 Nov 2023 20:06:22 +0800 Subject: [PATCH 8/8] Update src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java Co-authored-by: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> --- .../java/redis/clients/jedis/DefaultJedisSocketFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java index 6acf24c13c..a2d963e221 100644 --- a/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java +++ b/src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java @@ -70,6 +70,8 @@ private Socket connectToFirstSuccessfulHost(HostAndPort hostAndPort) throws Exce socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, the underlying socket is closed immediately + // Passing 'host' directly will avoid another call to InetAddress.getByName() inside the InetSocketAddress constructor. + // For machines with ipv4 and ipv6, but the startNode uses ipv4 to connect, the ipv6 connection may fail. socket.connect(new InetSocketAddress(host, hostAndPort.getPort()), connectionTimeout); return socket; } catch (Exception e) {