Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix: gc task may encounter deadlock (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
cauchy1988 authored Oct 26, 2021
1 parent b3d80c9 commit e2cc860
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM
* administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClusterManager extends Cluster {
private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
private EventLoopGroup metaGroup; // group used for handle meta logic
private EventLoopGroup replicaGroup; // group used for handle io with replica servers
private EventLoopGroup timeoutTaskGroup; // group used for handle timeout task in replica servers
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
Expand All @@ -72,6 +73,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {

replicaSessions = new ConcurrentHashMap<rpc_address, ReplicaSession>();
replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
timeoutTaskGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);
sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts);
Expand Down Expand Up @@ -104,6 +106,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
new ReplicaSession(
address,
replicaGroup,
timeoutTaskGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
sessionResetTimeWindowSecs,
sessionInterceptorManager);
Expand Down Expand Up @@ -169,6 +172,7 @@ public void close() {
Future metaGroupFuture = metaGroup.shutdownGracefully();
Future replicaGroupFuture = replicaGroup.shutdownGracefully();
Future tableGroupFuture = tableGroup.shutdownGracefully();
Future timeoutTaskGroupFuture = timeoutTaskGroup.shutdownGracefully();

try {
metaGroupFuture.sync();
Expand All @@ -191,6 +195,13 @@ public void close() {
logger.warn("close table group failed: ", ex);
}

try {
timeoutTaskGroupFuture.sync();
logger.info("timeout task group has closed");
} catch (Exception ex) {
logger.warn("close timeout task group failed: ", ex);
}

logger.info("cluster manager has closed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ public enum ConnState {
public ReplicaSession(
rpc_address address,
EventLoopGroup rpcGroup,
EventLoopGroup timeoutTaskGroup,
int socketTimeout,
long sessionResetTimeWindowSec,
ReplicaSessionInterceptorManager interceptorManager) {
this.address = address;
this.rpcGroup = rpcGroup;
this.timeoutTaskGroup = timeoutTaskGroup;
this.interceptorManager = interceptorManager;
this.sessionResetTimeWindowMs = sessionResetTimeWindowSec * 1000;

Expand Down Expand Up @@ -358,7 +359,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
// this task will be cancelled.
// TODO(wutao1): call it addTimeoutTicker
private ScheduledFuture<?> addTimer(final int seqID, long timeoutInMillseconds) {
return rpcGroup.schedule(
return this.timeoutTaskGroup.schedule(
new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -473,7 +474,7 @@ static final class VolatileFields {

private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private EventLoopGroup timeoutTaskGroup;
private ReplicaSessionInterceptorManager interceptorManager;
private boolean authSucceed;
final Queue<RequestEntry> authPendingSend = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testPingZK() throws PException {
System.out.println("write config to " + configPath);
zkClient.writeData(zkPath, configData.getBytes());

PegasusClientInterface client = PegasusClientFactory.getSingletonClient(configPath);
PegasusClientInterface client = PegasusClientFactory.createClient(configPath);
String tableName = "temp";

byte[] hashKey = "hello".getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,10 @@ public void testSessionConnectTimeout() throws InterruptedException {

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4);
ReplicaSession rs =
new ReplicaSession(addr, rpcGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
new ReplicaSession(
addr, rpcGroup, timeoutTaskGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Expand Down

0 comments on commit e2cc860

Please sign in to comment.