diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index f7b5239c793..510e1f6c233 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -50,8 +50,6 @@ public class DefaultFuture extends CompletableFuture { private static final Map FUTURES = new ConcurrentHashMap<>(); -// private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); - public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), 30, @@ -64,6 +62,7 @@ public class DefaultFuture extends CompletableFuture { private final int timeout; private final long start = System.currentTimeMillis(); private volatile long sent; + private Timeout timeoutCheckTask; private ExecutorService executor; @@ -85,6 +84,14 @@ private DefaultFuture(Channel channel, Request request, int timeout) { CHANNELS.put(id, channel); } + /** + * check time out of the future + */ + private static void timeoutCheck(DefaultFuture future) { + TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); + future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); + } + /** * init a DefaultFuture * 1.init a DefaultFuture @@ -142,15 +149,19 @@ public static void closeChannel(Channel channel) { } public static void received(Channel channel, Response response) { + received(channel, response, false); + } + + public static void received(Channel channel, Response response, boolean timeout) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { + Timeout t = future.timeoutCheckTask; + if (!timeout) { + // decrease Time + t.cancel(); + } future.doReceived(response); -// Timeout t = PENDING_TASKS.remove(future.getId()); -// if (t != null) { -// // decrease Time -// t.cancel(); -// } } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) @@ -178,6 +189,7 @@ public void cancel() { this.cancel(true); } + private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); @@ -224,15 +236,6 @@ private void doSent() { sent = System.currentTimeMillis(); } - /** - * check time out of the future - */ - private static void timeoutCheck(DefaultFuture future) { - TimeoutCheckTask task = new TimeoutCheckTask(future.getId()); - TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); -// PENDING_TASKS.put(future.getId(), t); - } - private String getTimeoutMessage(boolean scan) { long nowTimestamp = System.currentTimeMillis(); return (sent > 0 ? "Waiting server-side response timeout" : "Sending request timeout in client-side") @@ -256,10 +259,7 @@ private static class TimeoutCheckTask implements TimerTask { @Override public void run(Timeout timeout) { - // remove from pending task -// PENDING_TASKS.remove(future.getId()); - - DefaultFuture future = FUTURES.remove(requestID); + DefaultFuture future = DefaultFuture.getFuture(requestID); if (future == null || future.isDone()) { return; } @@ -271,7 +271,7 @@ public void run(Timeout timeout) { timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. - DefaultFuture.received(future.getChannel(), timeoutResponse); + DefaultFuture.received(future.getChannel(), timeoutResponse, true); }); } }