From 35f1914782c11283a7c36efb44a8d71729b34124 Mon Sep 17 00:00:00 2001 From: Ian Luo Date: Thu, 31 Jan 2019 10:25:30 +0800 Subject: [PATCH] Merge pull request #3341, start to use IdleStateHandler in Netty4. --- .../apache/dubbo/common/utils/UrlUtils.java | 13 ++ .../org/apache/dubbo/remoting/Client.java | 4 +- .../apache/dubbo/remoting/IdleSensible.java | 37 +++++ .../org/apache/dubbo/remoting/Server.java | 4 +- .../support/header/HeaderExchangeClient.java | 71 ++++----- .../support/header/HeaderExchangeServer.java | 56 +++----- .../support/header/ReconnectTimerTask.java | 13 +- .../remoting/transport/AbstractClient.java | 135 +----------------- .../support/header/HeartbeatHandlerTest.java | 3 + .../transport/netty/ClientReconnectTest.java | 31 +--- .../transport/netty4/NettyClient.java | 12 +- .../transport/netty4/NettyClientHandler.java | 35 ++++- .../transport/netty4/NettyServer.java | 12 ++ .../transport/netty4/NettyServerHandler.java | 20 ++- .../transport/netty4/ClientReconnectTest.java | 4 +- 15 files changed, 209 insertions(+), 241 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java index 5c157c5a03f..23b50339f4c 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java @@ -467,6 +467,19 @@ public static boolean isProvider(URL url) { PROVIDERS_CATEGORY.equals(url.getParameter(CATEGORY_KEY, PROVIDERS_CATEGORY)); } + public static int getHeartbeat(URL url) { + return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT); + } + + public static int getIdleTimeout(URL url) { + int heartBeat = getHeartbeat(url); + int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3); + if (idleTimeout < heartBeat * 2) { + throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); + } + return idleTimeout; + } + /** * Check if the given value matches the given pattern. The pattern supports wildcard "*". * diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java index b8ccad201c8..7f15535353c 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Client.java @@ -25,7 +25,7 @@ * * @see org.apache.dubbo.remoting.Transporter#connect(org.apache.dubbo.common.URL, ChannelHandler) */ -public interface Client extends Endpoint, Channel, Resetable { +public interface Client extends Endpoint, Channel, Resetable, IdleSensible { /** * reconnect. @@ -35,4 +35,4 @@ public interface Client extends Endpoint, Channel, Resetable { @Deprecated void reset(org.apache.dubbo.common.Parameters parameters); -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java new file mode 100644 index 00000000000..14e371e60d6 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/IdleSensible.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.dubbo.remoting; + +/** + * Indicate whether the implementation (for both server and client) has the ability to sense and handle idle connection. + * If the server has the ability to handle idle connection, it should close the connection when it happens, and if + * the client has the ability to handle idle connection, it should send the heartbeat to the server. + */ +public interface IdleSensible { + /** + * Whether the implementation can sense and handle the idle connection. By default it's false, the implementation + * relies on dedicated timer to take care of idle connection. + * + * @return whether has the ability to handle idle connection + */ + default boolean canHandleIdle() { + return false; + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java index f413cb0bb85..c6ecf16d44b 100755 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Server.java @@ -28,7 +28,7 @@ * * @see org.apache.dubbo.remoting.Transporter#bind(org.apache.dubbo.common.URL, ChannelHandler) */ -public interface Server extends Endpoint, Resetable { +public interface Server extends Endpoint, Resetable, IdleSensible { /** * is bound. @@ -55,4 +55,4 @@ public interface Server extends Endpoint, Resetable { @Deprecated void reset(org.apache.dubbo.common.Parameters parameters); -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 65862b6d6af..3e57fba90c4 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -33,6 +33,9 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; +import static org.apache.dubbo.common.utils.UrlUtils.getHeartbeat; +import static org.apache.dubbo.common.utils.UrlUtils.getIdleTimeout; + /** * DefaultMessageClient */ @@ -40,31 +43,21 @@ public class HeaderExchangeClient implements ExchangeClient { private final Client client; private final ExchangeChannel channel; - private int heartbeat; - private int idleTimeout; - - private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1, - TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL); + private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( + new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, Constants.TICKS_PER_WHEEL); private HeartbeatTimerTask heartBeatTimerTask; - private ReconnectTimerTask reconnectTimerTask; - public HeaderExchangeClient(Client client, boolean needHeartbeat) { + public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; this.channel = new HeaderExchangeChannel(client); - String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); - - this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && - dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); - this.idleTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); - if (idleTimeout < heartbeat * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - if (needHeartbeat) { - startIdleCheckTask(); + if (startTimer) { + URL url = client.getUrl(); + startReconnectTask(url); + startHeartBeatTask(url); } } @@ -145,6 +138,7 @@ public void startClose() { @Override public void reset(URL url) { client.reset(url); + // FIXME, should cancel and restart timer tasks if parameters in the new URL are different? } @Override @@ -178,25 +172,34 @@ public boolean hasAttribute(String key) { return channel.hasAttribute(key); } - private void startIdleCheckTask() { - AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); - - long heartbeatTick = calculateLeastDuration(heartbeat); - long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout); - HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); - ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout); - - this.heartBeatTimerTask = heartBeatTimerTask; - this.reconnectTimerTask = reconnectTimerTask; + private void startHeartBeatTask(URL url) { + if (!client.canHandleIdle()) { + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); + int heartbeat = getHeartbeat(url); + long heartbeatTick = calculateLeastDuration(heartbeat); + this.heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); + IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); + } + } - // init task and start timer. - IDLE_CHECK_TIMER.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); - IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); + private void startReconnectTask(URL url) { + if (shouldReconnect(url)) { + AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); + int idleTimeout = getIdleTimeout(url); + long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout); + this.reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, idleTimeout); + IDLE_CHECK_TIMER.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); + } } private void doClose() { - heartBeatTimerTask.cancel(); - reconnectTimerTask.cancel(); + if (heartBeatTimerTask != null) { + heartBeatTimerTask.cancel(); + } + + if (reconnectTimerTask != null) { + reconnectTimerTask.cancel(); + } } /** @@ -210,6 +213,10 @@ private long calculateLeastDuration(int time) { } } + private boolean shouldReconnect(URL url) { + return url.getParameter(Constants.RECONNECT_KEY, true); + } + @Override public String toString() { return "HeaderExchangeClient [channel=" + channel + "]"; diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java index 65b0836fcc8..8a74e234134 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java @@ -25,6 +25,7 @@ import org.apache.dubbo.common.utils.Assert; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; @@ -49,8 +50,6 @@ public class HeaderExchangeServer implements ExchangeServer { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final Server server; - private int heartbeat; - private int idleTimeout; private AtomicBoolean closed = new AtomicBoolean(false); private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-idleCheck", true), 1, @@ -61,13 +60,7 @@ public class HeaderExchangeServer implements ExchangeServer { public HeaderExchangeServer(Server server) { Assert.notNull(server, "server == null"); this.server = server; - this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); - this.idleTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); - if (idleTimeout < heartbeat * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - - startIdleCheckTask(); + startIdleCheckTask(getUrl()); } public Server getServer() { @@ -154,7 +147,9 @@ private void doClose() { } private void cancelCloseTask() { - closeTimerTask.cancel(); + if (closeTimerTask != null) { + closeTimerTask.cancel(); + } } @Override @@ -210,21 +205,13 @@ public ChannelHandler getChannelHandler() { public void reset(URL url) { server.reset(url); try { - if (url.hasParameter(Constants.HEARTBEAT_KEY) - || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) { - int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat); - int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3); - if (t < h * 2) { - throw new IllegalStateException("idleTimeout < heartbeatInterval * 2"); - } - if (h != heartbeat || t != idleTimeout) { - heartbeat = h; - idleTimeout = t; - - // we need cancel the exist closeTimeout first. - cancelCloseTask(); - startIdleCheckTask(); - } + int currHeartbeat = UrlUtils.getHeartbeat(getUrl()); + int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl()); + int heartbeat = UrlUtils.getHeartbeat(url); + int idleTimeout = UrlUtils.getIdleTimeout(url); + if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) { + cancelCloseTask(); + startIdleCheckTask(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); @@ -266,15 +253,16 @@ private long calculateLeastDuration(int time) { } } - private void startIdleCheckTask() { - AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); - - long idleTimeoutTick = calculateLeastDuration(idleTimeout); - CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); - this.closeTimerTask = closeTimerTask; + private void startIdleCheckTask(URL url) { + if (!server.canHandleIdle()) { + AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels()); + int idleTimeout = UrlUtils.getIdleTimeout(url); + long idleTimeoutTick = calculateLeastDuration(idleTimeout); + CloseTimerTask closeTimerTask = new CloseTimerTask(cp, idleTimeoutTick, idleTimeout); + this.closeTimerTask = closeTimerTask; - // init task and start timer. - IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); + // init task and start timer. + IDLE_CHECK_TIMER.newTimeout(closeTimerTask, idleTimeoutTick, TimeUnit.MILLISECONDS); + } } - } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java index 3c0e9389db8..5fd8c90b956 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java @@ -41,9 +41,18 @@ protected void doTask(Channel channel) { try { Long lastRead = lastRead(channel); Long now = now(); + + // Rely on reconnect timer to reconnect when AbstractClient.doConnect fails to init the connection + if (!channel.isConnected()) { + try { + logger.info("Initial connection to " + channel); + ((Client) channel).reconnect(); + } catch (Exception e) { + logger.error("Fail to connect to " + channel, e); + } // check pong at client - if (lastRead != null && now - lastRead > idleTimeout) { - logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + } else if (lastRead != null && now - lastRead > idleTimeout) { + logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: " + idleTimeout + "ms"); try { ((Client) channel).reconnect(); diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java index 7280b508f5e..4138398f235 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java @@ -24,9 +24,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.store.DataStore; import org.apache.dubbo.common.utils.ExecutorUtil; -import org.apache.dubbo.common.utils.NamedThreadFactory; import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.Client; @@ -35,12 +33,6 @@ import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -51,31 +43,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler"; private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class); - private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger(); - private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true)); private final Lock connectLock = new ReentrantLock(); - private final boolean send_reconnect; - private final AtomicInteger reconnect_count = new AtomicInteger(0); - // Reconnection error log has been called before? - private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false); - // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test - private final int reconnect_warning_period; - private final long shutdown_timeout; + private final boolean needReconnect; protected volatile ExecutorService executor; - private volatile ScheduledFuture reconnectExecutorFuture = null; - // the last successed connected time - private long lastConnectedTime = System.currentTimeMillis(); - public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); - send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); - - shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); - - // The default reconnection interval is 2s, 1800 means warning interval is 1 hour. - reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); + needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); try { doOpen(); @@ -118,105 +93,6 @@ protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handl return ChannelHandlers.wrap(handler, url); } - /** - * @param url - * @return 0-false - */ - private static int getReconnectParam(URL url) { - int reconnect; - String param = url.getParameter(Constants.RECONNECT_KEY); - if (StringUtils.isEmpty(param) || "true".equalsIgnoreCase(param)) { - reconnect = Constants.DEFAULT_RECONNECT_PERIOD; - } else if ("false".equalsIgnoreCase(param)) { - reconnect = 0; - } else { - try { - reconnect = Integer.parseInt(param); - } catch (Exception e) { - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param); - } - if (reconnect < 0) { - throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param); - } - } - return reconnect; - } - - /** - * init reconnect thread - */ - private synchronized void initConnectStatusCheckCommand() { - //reconnect=false to close reconnect - int reconnect = getReconnectParam(getUrl()); - if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) { - Runnable connectStatusCheckCommand = new Runnable() { - @Override - public void run() { - try { - if (cancelFutureIfOffline()) return; - - if (!isConnected()) { - connect(); - } else { - lastConnectedTime = System.currentTimeMillis(); - } - } catch (Throwable t) { - String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl(); - // wait registry sync provider list - if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) { - if (!reconnect_error_log_flag.get()) { - reconnect_error_log_flag.set(true); - logger.error(errorMsg, t); - return; - } - } - if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) { - logger.warn(errorMsg, t); - } - } - } - - private boolean cancelFutureIfOffline() { - /** - * If the provider service is detected offline, - * the client should not attempt to connect again. - * - * issue: https://github.com/apache/incubator-dubbo/issues/3158 - */ - if(isClosed()) { - ScheduledFuture future = reconnectExecutorFuture; - if(future != null && !future.isCancelled()){ - /** - * Client has been destroyed and - * scheduled task should be cancelled. - */ - future.cancel(true); - } - return true; - } - return false; - } - }; - - reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); - } - } - - private synchronized void destroyConnectStatusCheckCommand() { - try { - if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) { - reconnectExecutorFuture.cancel(true); - reconnectExecutorService.purge(); - } - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - } - } - - protected ExecutorService createExecutor() { - return Executors.newCachedThreadPool(new NamedThreadFactory(CLIENT_THREAD_POOL_NAME + CLIENT_THREAD_POOL_ID.incrementAndGet() + "-" + getUrl().getAddress(), true)); - } - public InetSocketAddress getConnectAddress() { return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort()); } @@ -286,7 +162,7 @@ public boolean hasAttribute(String key) { @Override public void send(Object message, boolean sent) throws RemotingException { - if (send_reconnect && !isConnected()) { + if (needReconnect && !isConnected()) { connect(); } Channel channel = getChannel(); @@ -303,7 +179,7 @@ protected void connect() throws RemotingException { if (isConnected()) { return; } - initConnectStatusCheckCommand(); + doConnect(); if (!isConnected()) { throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " @@ -316,8 +192,6 @@ protected void connect() throws RemotingException { + ", channel is " + this.getChannel()); } } - reconnect_count.set(0); - reconnect_error_log_flag.set(false); } catch (RemotingException e) { throw e; } catch (Throwable e) { @@ -332,7 +206,6 @@ protected void connect() throws RemotingException { public void disconnect() { connectLock.lock(); try { - destroyConnectStatusCheckCommand(); try { Channel channel = getChannel(); if (channel != null) { diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java index 13b1f99acad..52c3e9d03e7 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandlerTest.java @@ -53,6 +53,9 @@ public void after() throws Exception { server.close(); server = null; } + + // wait for timer to finish + Thread.sleep(2000); } @Test diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java index 16160b40ae7..4cb7eb6784d 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientReconnectTest.java @@ -18,7 +18,6 @@ import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.utils.DubboAppender; -import org.apache.dubbo.common.utils.LogUtil; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.Client; @@ -27,7 +26,6 @@ import org.apache.dubbo.remoting.exchange.Exchangers; import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.log4j.Level; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,9 +34,6 @@ * Client reconnect test */ public class ClientReconnectTest { - public static void main(String[] args) { - System.out.println(3 % 1); - } @BeforeEach public void clear() { @@ -73,31 +68,9 @@ public void testReconnect() throws RemotingException, InterruptedException { } } - /** - * Reconnect log check, when the time is not enough for shutdown time, there is no error log, but there must be a warn log - */ - @Test - public void testReconnectWarnLog() throws RemotingException, InterruptedException { - int port = NetUtils.getAvailablePort(); - DubboAppender.doStart(); - String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" - + Constants.RECONNECT_KEY + "=" + 1; //1ms reconnect, ensure that there is enough frequency to reconnect - try { - Exchangers.connect(url); - } catch (Exception e) { - - //do nothing - } - Thread.sleep(1500); - //Time is not long enough to produce a error log - Assertions.assertEquals(0, LogUtil.findMessage(Level.ERROR, "client reconnect to "), "no error message "); - //The first reconnection failed to have a warn log - Assertions.assertEquals(1, LogUtil.findMessage(Level.WARN, "client reconnect to "), "must have one warn message "); - DubboAppender.doStop(); - } - public Client startClient(int port, int reconnectPeriod) throws RemotingException { - final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" + Constants.RECONNECT_KEY + "=" + reconnectPeriod; + public Client startClient(int port, int heartbeat) throws RemotingException { + final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?check=false&client=netty3&" + Constants.HEARTBEAT_KEY + "=" + heartbeat; return Exchangers.connect(url); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java index 4ea6fe12c52..464087e7c23 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.transport.AbstractClient; @@ -34,9 +35,10 @@ import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * NettyClient. @@ -76,10 +78,12 @@ protected void doOpen() throws Throwable { @Override protected void initChannel(Channel ch) throws Exception { + int heartbeatInterval = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) + .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)) .addLast("handler", nettyClientHandler); } }); @@ -90,7 +94,7 @@ protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); ChannelFuture future = bootstrap.connect(getConnectAddress()); try { - boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); + boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); @@ -162,4 +166,8 @@ protected org.apache.dubbo.remoting.Channel getChannel() { return NettyChannel.getOrAddChannel(c, getUrl(), this); } + @Override + public boolean canHandleIdle() { + return true; + } } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java index 265d6ef6676..c7077086dbb 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java @@ -16,20 +16,26 @@ */ package org.apache.dubbo.remoting.transport.netty4; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.Version; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.exchange.Response; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; + /** * NettyClientHandler */ @io.netty.channel.ChannelHandler.Sharable public class NettyClientHandler extends ChannelDuplexHandler { + private static final Logger logger = LoggerFactory.getLogger(NettyClient.class); private final URL url; @@ -105,6 +111,27 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) }); } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + try { + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + if (logger.isDebugEnabled()) { + logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel); + } + Request req = new Request(); + req.setVersion(Version.getProtocolVersion()); + req.setTwoWay(true); + req.setEvent(Request.HEARTBEAT_EVENT); + channel.send(req); + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); + } + } else { + super.userEventTriggered(ctx, evt); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -129,4 +156,4 @@ private static Response buildErrorResponse(Request request, Throwable t) { response.setErrorMessage(StringUtils.toString(t)); return response; } -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java index a8da94e05bd..62b6c55f93c 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java @@ -22,6 +22,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.common.utils.UrlUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; import org.apache.dubbo.remoting.RemotingException; @@ -38,6 +39,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultThreadFactory; import java.net.InetSocketAddress; @@ -45,6 +47,8 @@ import java.util.HashSet; import java.util.Map; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + /** * NettyServer */ @@ -84,10 +88,13 @@ protected void doOpen() throws Throwable { .childHandler(new ChannelInitializer() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { + // FIXME: should we use getTimeout()? + int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) + .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); @@ -157,6 +164,11 @@ public Channel getChannel(InetSocketAddress remoteAddress) { return channels.get(NetUtils.toAddressString(remoteAddress)); } + @Override + public boolean canHandleIdle() { + return true; + } + @Override public boolean isBound() { return channel.isActive(); diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java index 56d4ff69c99..ea5b3249839 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java @@ -17,6 +17,8 @@ package org.apache.dubbo.remoting.transport.netty4; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.remoting.Channel; import org.apache.dubbo.remoting.ChannelHandler; @@ -24,6 +26,7 @@ import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.handler.timeout.IdleStateEvent; import java.net.InetSocketAddress; import java.util.Map; @@ -34,6 +37,7 @@ */ @io.netty.channel.ChannelHandler.Sharable public class NettyServerHandler extends ChannelDuplexHandler { + private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private final Map channels = new ConcurrentHashMap(); // @@ -102,6 +106,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); + try { + logger.info("IdleStateEvent triggered, close channel " + channel); + channel.close(); + } finally { + NettyChannel.removeChannelIfDisconnected(ctx.channel()); + } + } + super.userEventTriggered(ctx, evt); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -112,4 +130,4 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } -} \ No newline at end of file +} diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java index eae66742767..f21918d3d15 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientReconnectTest.java @@ -71,8 +71,8 @@ public void testReconnect() throws RemotingException, InterruptedException { } - public Client startClient(int port, int reconnectPeriod) throws RemotingException { - final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?client=netty4&check=false&" + Constants.RECONNECT_KEY + "=" + reconnectPeriod; + public Client startClient(int port, int heartbeat) throws RemotingException { + final String url = "exchange://127.0.0.1:" + port + "/client.reconnect.test?client=netty4&check=false&" + Constants.HEARTBEAT_KEY + "=" + heartbeat; return Exchangers.connect(url); }