diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java index 3af2de70..529580c2 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusClientInterface.java @@ -69,8 +69,8 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM * administrator * @param tableOptions control the table feature, such as open backup-request, compress and etc, * see {@link TableOptions} - * @return - * @throws PException + * @return the table handler + * @throws PException throws exception if any error occurs. */ public PegasusTableInterface openTable(String tableName, TableOptions tableOptions) throws PException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java index 25e9db5e..cc57b78e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ClusterManager.java @@ -48,6 +48,7 @@ public class ClusterManager extends Cluster { private ConcurrentHashMap replicaSessions; private EventLoopGroup metaGroup; // group used for handle meta logic private EventLoopGroup replicaGroup; // group used for handle io with replica servers + private EventLoopGroup timeoutTaskGroup; // group used for handle timeout task in replica servers private EventLoopGroup tableGroup; // group used for handle table logic private String[] metaList; private MetaSession metaSession; @@ -72,6 +73,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException { replicaSessions = new ConcurrentHashMap(); replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers()); + timeoutTaskGroup = getEventLoopGroupInstance(opts.getAsyncWorkers()); metaGroup = getEventLoopGroupInstance(1); tableGroup = getEventLoopGroupInstance(1); sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts); @@ -104,6 +106,7 @@ public ReplicaSession getReplicaSession(rpc_address address) { new ReplicaSession( address, replicaGroup, + timeoutTaskGroup, max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT), sessionResetTimeWindowSecs, sessionInterceptorManager); @@ -169,6 +172,7 @@ public void close() { Future metaGroupFuture = metaGroup.shutdownGracefully(); Future replicaGroupFuture = replicaGroup.shutdownGracefully(); Future tableGroupFuture = tableGroup.shutdownGracefully(); + Future timeoutTaskGroupFuture = timeoutTaskGroup.shutdownGracefully(); try { metaGroupFuture.sync(); @@ -191,6 +195,13 @@ public void close() { logger.warn("close table group failed: ", ex); } + try { + timeoutTaskGroupFuture.sync(); + logger.info("timeout task group has closed"); + } catch (Exception ex) { + logger.warn("close timeout task group failed: ", ex); + } + logger.info("cluster manager has closed"); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java index a8f34e74..66ffaeaf 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSession.java @@ -57,11 +57,12 @@ public enum ConnState { public ReplicaSession( rpc_address address, EventLoopGroup rpcGroup, + EventLoopGroup timeoutTaskGroup, int socketTimeout, long sessionResetTimeWindowSec, ReplicaSessionInterceptorManager interceptorManager) { this.address = address; - this.rpcGroup = rpcGroup; + this.timeoutTaskGroup = timeoutTaskGroup; this.interceptorManager = interceptorManager; this.sessionResetTimeWindowMs = sessionResetTimeWindowSec * 1000; @@ -358,7 +359,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception { // this task will be cancelled. // TODO(wutao1): call it addTimeoutTicker private ScheduledFuture addTimer(final int seqID, long timeoutInMillseconds) { - return rpcGroup.schedule( + return this.timeoutTaskGroup.schedule( new Runnable() { @Override public void run() { @@ -473,7 +474,7 @@ static final class VolatileFields { private final rpc_address address; private Bootstrap boot; - private EventLoopGroup rpcGroup; + private EventLoopGroup timeoutTaskGroup; private ReplicaSessionInterceptorManager interceptorManager; private boolean authSucceed; final Queue authPendingSend = new LinkedList<>(); diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPingZK.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPingZK.java index ead83468..98b3b4a5 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPingZK.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPingZK.java @@ -37,7 +37,7 @@ public void testPingZK() throws PException { System.out.println("write config to " + configPath); zkClient.writeData(zkPath, configData.getBytes()); - PegasusClientInterface client = PegasusClientFactory.getSingletonClient(configPath); + PegasusClientInterface client = PegasusClientFactory.createClient(configPath); String tableName = "temp"; byte[] hashKey = "hello".getBytes(); diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java index 155a6180..4fd1cb14 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/rpc/async/ReplicaSessionTest.java @@ -295,8 +295,10 @@ public void testSessionConnectTimeout() throws InterruptedException { long start = System.currentTimeMillis(); EventLoopGroup rpcGroup = new NioEventLoopGroup(4); + EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4); ReplicaSession rs = - new ReplicaSession(addr, rpcGroup, 1000, 30, (ReplicaSessionInterceptorManager) null); + new ReplicaSession( + addr, rpcGroup, timeoutTaskGroup, 1000, 30, (ReplicaSessionInterceptorManager) null); rs.tryConnect().awaitUninterruptibly(); long end = System.currentTimeMillis(); Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec