Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

close session when it's not responding for a while #32

Merged
merged 4 commits into from
Jan 17, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by weijiesun on 17-9-13.
Expand Down Expand Up @@ -62,6 +63,8 @@ public void initChannel(SocketChannel ch) {
pipeline.addLast("ClientHandler", new ReplicaSession.DefaultHandler());
}
});

this.firstRecentTimedOutMs = new AtomicLong(0);
}

// You can specify a message response filter with constructor or with "setMessageResponseFilter" function.
Expand All @@ -71,6 +74,7 @@ public ReplicaSession(rpc_address address, EventLoopGroup rpcGroup, int socketTi
this(address, rpcGroup, socketTimeout);
this.filter = filter;
}

public void setMessageResponseFilter(MessageResponseFilter filter) {
this.filter = filter;
}
Expand Down Expand Up @@ -220,8 +224,25 @@ private void tryNotifyWithSequenceID(
entry.timeoutTask.cancel(true);
entry.op.rpc_error.errno = errno;
entry.callback.run();
}
else {

if (errno == error_types.ERR_TIMEOUT) {
long firstTs = firstRecentTimedOutMs.get();
if (firstTs == 0) {
// it is the first timeout in the window.
firstRecentTimedOutMs.set(System.currentTimeMillis());
} else if (System.currentTimeMillis() - firstTs >= sessionResetTimeWindowMs) {
// ensure that closeSession() will be invoked only once.
if (firstRecentTimedOutMs.compareAndSet(firstTs, 0)) {
logger.warn("{}: actively close the session because it's not responding for {} seconds",
name(),
sessionResetTimeWindowMs / 1000);
closeSession();
}
}
} else {
firstRecentTimedOutMs.set(0);
}
} else {
logger.warn("{}: {} is removed by others, current error {}, isTimeoutTask {}",
name(), seqID, errno.toString(), isTimeoutTask);
}
Expand Down Expand Up @@ -305,11 +326,19 @@ private final static class VolatileFields {
public ConnState state = ConnState.DISCONNECTED;
public Channel nettyChannel = null;
}

private volatile VolatileFields fields = new VolatileFields();

private rpc_address address;
private final rpc_address address;
private Bootstrap boot;
private EventLoopGroup rpcGroup;

// Session will be actively closed if all the rpcs across `sessionResetTimeWindowMs`
// are timed out, in that case we suspect that the server is unavailable.

// Timestamp of the first timed out rpc.
private AtomicLong firstRecentTimedOutMs;
private static final long sessionResetTimeWindowMs = 10 * 1000; // 10s

private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSession.class);
}