diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java index 9f6876d0bfd..bb88fac962c 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/entrance/CollectServer.java @@ -132,12 +132,17 @@ public void onChannelActive(Channel channel) { scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); // schedule send heartbeat message scheduledExecutor.scheduleAtFixedRate(() -> { - ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder() - .setIdentity(identity) - .setType(ClusterMsg.MessageType.HEARTBEAT) - .build(); - CollectServer.this.sendMsg(heartbeat); - log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis()); + try { + ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder() + .setIdentity(identity) + .setDirection(ClusterMsg.Direction.REQUEST) + .setType(ClusterMsg.MessageType.HEARTBEAT) + .build(); + CollectServer.this.sendMsg(heartbeat); + log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis()); + } catch (Exception e) { + log.error("schedule send heartbeat to server error.{}", e.getMessage()); + } }, 5, 5, TimeUnit.SECONDS); } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java index 454155e845e..a094b3081df 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/SchedulerProperties.java @@ -26,6 +26,13 @@ public static class ServerProperties { private boolean enabled = true; private int port = 1158; + + /** + * an IdleStateEvent whose state is IdleState.ALL_IDLE will be triggered when neither read nor write + * was performed for the specified period of time. + * unit: s + */ + private int idleStateEventTriggerTime = 100; public boolean isEnabled() { return enabled; @@ -42,6 +49,14 @@ public int getPort() { public void setPort(int port) { this.port = port; } + + public int getIdleStateEventTriggerTime() { + return idleStateEventTriggerTime; + } + + public void setIdleStateEventTriggerTime(int idleStateEventTriggerTime) { + this.idleStateEventTriggerTime = idleStateEventTriggerTime; + } } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java index d52fe527396..3111ed7ba9e 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/ManageServer.java @@ -1,8 +1,6 @@ package org.dromara.hertzbeat.manager.scheduler.netty; -import com.google.common.collect.Lists; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.dromara.hertzbeat.common.entity.message.ClusterMsg; import org.dromara.hertzbeat.common.support.CommonThreadPool; @@ -15,7 +13,6 @@ import org.dromara.hertzbeat.manager.scheduler.SchedulerProperties; import org.dromara.hertzbeat.remoting.RemotingServer; import org.dromara.hertzbeat.remoting.event.NettyEventListener; -import org.dromara.hertzbeat.remoting.netty.NettyHook; import org.dromara.hertzbeat.remoting.netty.NettyRemotingServer; import org.dromara.hertzbeat.remoting.netty.NettyServerConfig; import org.springframework.boot.CommandLineRunner; @@ -59,17 +56,10 @@ public ManageServer(final SchedulerProperties schedulerProperties, private void init(final SchedulerProperties schedulerProperties, final CommonThreadPool threadPool) { NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setPort(schedulerProperties.getServer().getPort()); + nettyServerConfig.setIdleStateEventTriggerTime(schedulerProperties.getServer().getIdleStateEventTriggerTime()); NettyEventListener nettyEventListener = new ManageNettyEventListener(); this.remotingServer = new NettyRemotingServer(nettyServerConfig, nettyEventListener, threadPool); - - // register hook - this.remotingServer.registerHook(Lists.newArrayList(new NettyHook() { - @Override - public void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message message) { - ManageServer.this.clientChannelTable.put(message.getIdentity(), ctx.channel()); - } - })); - + // register processor this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new HeartbeatProcessor(this)); this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new CollectorOnlineProcessor(this)); @@ -84,13 +74,17 @@ public void start() { this.remotingServer.start(); this.channelSchedule.scheduleAtFixedRate(() -> { - ManageServer.this.clientChannelTable.forEach((collector, channel) -> { - if (!channel.isActive()) { - channel.closeFuture(); - ManageServer.this.clientChannelTable.remove(collector); - ManageServer.this.collectorAndJobScheduler.collectorGoOffline(collector); - } - }); + try { + this.clientChannelTable.forEach((collector, channel) -> { + if (!channel.isActive()) { + channel.closeFuture(); + this.clientChannelTable.remove(collector); + this.collectorAndJobScheduler.collectorGoOffline(collector); + } + }); + } catch (Exception e) { + log.error(e.getMessage(), e); + } }, 10, 3, TimeUnit.SECONDS); } @@ -113,6 +107,14 @@ public Channel getChannel(final String identity) { return channel; } + public void addChannel(final String identity, Channel channel) { + Channel preChannel = this.clientChannelTable.get(identity); + if (preChannel != null && channel.isActive()) { + preChannel.close(); + } + this.clientChannelTable.put(identity, channel); + } + public void closeChannel(final String identity) { Channel channel = this.getChannel(identity); if (channel != null) { @@ -165,6 +167,7 @@ public void onChannelIdle(Channel channel) { if (identity != null) { ManageServer.this.clientChannelTable.remove(identity); ManageServer.this.collectorAndJobScheduler.collectorGoOffline(identity); + channel.close(); log.info("handle idle event triggered. the client {} is going offline.", identity); } } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java index 7e2292654c9..cf58b3edb85 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/CollectorOnlineProcessor.java @@ -24,6 +24,7 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m String collector = message.getIdentity(); log.info("the collector {} actively requests to go online.", collector); CollectorInfo collectorInfo = JsonUtil.fromJson(message.getMsg(), CollectorInfo.class); + this.manageServer.addChannel(collector, ctx.channel()); this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector, collectorInfo); return null; } diff --git a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java index 67e3f1fb0ab..9c0a30116c6 100644 --- a/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java +++ b/manager/src/main/java/org/dromara/hertzbeat/manager/scheduler/netty/process/HeartbeatProcessor.java @@ -23,11 +23,10 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m String identity = message.getIdentity(); boolean isChannelExist = this.manageServer.isChannelExist(identity); if (!isChannelExist) { - log.info("the collector {} has reconnected and to go online.", identity); - this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(identity, null); + log.info("the collector {} is not online.", identity); } if (log.isDebugEnabled()) { - log.debug("server receive collector heartbeat"); + log.debug("server receive collector {} heartbeat", message.getIdentity()); } return ClusterMsg.Message.newBuilder() .setType(ClusterMsg.MessageType.HEARTBEAT) diff --git a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java index 2f2948de834..a2723ac87d4 100644 --- a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -138,7 +138,7 @@ private void initChannel(final SocketChannel channel) { pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); // idle state - pipeline.addLast(new IdleStateHandler(0, 0, 30)); + pipeline.addLast(new IdleStateHandler(0, 0, nettyServerConfig.getIdleStateEventTriggerTime())); pipeline.addLast(new NettyServerHandler()); } diff --git a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java index 51c3160297a..369117b372e 100644 --- a/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/dromara/hertzbeat/remoting/netty/NettyServerConfig.java @@ -27,4 +27,6 @@ public class NettyServerConfig { private Integer port; + private Integer idleStateEventTriggerTime = 100; + }