Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

fix: gc task may encounter deadlock #168

Merged
merged 6 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public PegasusTableInterface openTable(String tableName, int backupRequestDelayM
* administrator
* @param tableOptions control the table feature, such as open backup-request, compress and etc,
* see {@link TableOptions}
* @return
* @throws PException
* @return the table handler
* @throws PException throws exception if any error occurs.
*/
public PegasusTableInterface openTable(String tableName, TableOptions tableOptions)
throws PException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ClusterManager extends Cluster {
private ConcurrentHashMap<rpc_address, ReplicaSession> replicaSessions;
private EventLoopGroup metaGroup; // group used for handle meta logic
private EventLoopGroup replicaGroup; // group used for handle io with replica servers
private EventLoopGroup timeoutTaskGroup; // group used for handle timeout task in replica servers
private EventLoopGroup tableGroup; // group used for handle table logic
private String[] metaList;
private MetaSession metaSession;
Expand All @@ -72,6 +73,7 @@ public ClusterManager(ClientOptions opts) throws IllegalArgumentException {

replicaSessions = new ConcurrentHashMap<rpc_address, ReplicaSession>();
replicaGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
timeoutTaskGroup = getEventLoopGroupInstance(opts.getAsyncWorkers());
metaGroup = getEventLoopGroupInstance(1);
tableGroup = getEventLoopGroupInstance(1);
sessionInterceptorManager = new ReplicaSessionInterceptorManager(opts);
Expand Down Expand Up @@ -104,6 +106,7 @@ public ReplicaSession getReplicaSession(rpc_address address) {
new ReplicaSession(
address,
replicaGroup,
timeoutTaskGroup,
max(operationTimeout, ClientOptions.MIN_SOCK_CONNECT_TIMEOUT),
sessionResetTimeWindowSecs,
sessionInterceptorManager);
Expand Down Expand Up @@ -169,6 +172,7 @@ public void close() {
Future metaGroupFuture = metaGroup.shutdownGracefully();
Future replicaGroupFuture = replicaGroup.shutdownGracefully();
Future tableGroupFuture = tableGroup.shutdownGracefully();
Future timeoutTaskGroupFuture = timeoutTaskGroup.shutdownGracefully();

try {
metaGroupFuture.sync();
Expand All @@ -191,6 +195,13 @@ public void close() {
logger.warn("close table group failed: ", ex);
}

try {
timeoutTaskGroupFuture.sync();
logger.info("timeout task group has closed");
} catch (Exception ex) {
logger.warn("close timeout task group failed: ", ex);
}

logger.info("cluster manager has closed");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ public enum ConnState {
public ReplicaSession(
rpc_address address,
EventLoopGroup rpcGroup,
EventLoopGroup timeoutTaskGroup,
int socketTimeout,
long sessionResetTimeWindowSec,
ReplicaSessionInterceptorManager interceptorManager) {
this.address = address;
this.rpcGroup = rpcGroup;
this.timeoutTaskGroup = timeoutTaskGroup;
this.interceptorManager = interceptorManager;
this.sessionResetTimeWindowMs = sessionResetTimeWindowSec * 1000;

Expand Down Expand Up @@ -358,7 +359,7 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
// this task will be cancelled.
// TODO(wutao1): call it addTimeoutTicker
private ScheduledFuture<?> addTimer(final int seqID, long timeoutInMillseconds) {
return rpcGroup.schedule(
return this.timeoutTaskGroup.schedule(
new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -473,7 +474,7 @@ static final class VolatileFields {

private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;
private EventLoopGroup timeoutTaskGroup;
private ReplicaSessionInterceptorManager interceptorManager;
private boolean authSucceed;
final Queue<RequestEntry> authPendingSend = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testPingZK() throws PException {
System.out.println("write config to " + configPath);
zkClient.writeData(zkPath, configData.getBytes());

PegasusClientInterface client = PegasusClientFactory.getSingletonClient(configPath);
PegasusClientInterface client = PegasusClientFactory.createClient(configPath);
String tableName = "temp";

byte[] hashKey = "hello".getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,10 @@ public void testSessionConnectTimeout() throws InterruptedException {

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
EventLoopGroup timeoutTaskGroup = new NioEventLoopGroup(4);
ReplicaSession rs =
new ReplicaSession(addr, rpcGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
new ReplicaSession(
addr, rpcGroup, timeoutTaskGroup, 1000, 30, (ReplicaSessionInterceptorManager) null);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Expand Down