Skip to content

Commit

Permalink
bugfix collector can not auto reconnect when channel idle (#1259)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomsun28 authored Sep 24, 2023
1 parent 376c695 commit b68c64e
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,17 @@ public void onChannelActive(Channel channel) {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
// schedule send heartbeat message
scheduledExecutor.scheduleAtFixedRate(() -> {
ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder()
.setIdentity(identity)
.setType(ClusterMsg.MessageType.HEARTBEAT)
.build();
CollectServer.this.sendMsg(heartbeat);
log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis());
try {
ClusterMsg.Message heartbeat = ClusterMsg.Message.newBuilder()
.setIdentity(identity)
.setDirection(ClusterMsg.Direction.REQUEST)
.setType(ClusterMsg.MessageType.HEARTBEAT)
.build();
CollectServer.this.sendMsg(heartbeat);
log.info("collector send cluster server heartbeat, time: {}.", System.currentTimeMillis());
} catch (Exception e) {
log.error("schedule send heartbeat to server error.{}", e.getMessage());
}
}, 5, 5, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ public static class ServerProperties {
private boolean enabled = true;

private int port = 1158;

/**
* an IdleStateEvent whose state is IdleState.ALL_IDLE will be triggered when neither read nor write
* was performed for the specified period of time.
* unit: s
*/
private int idleStateEventTriggerTime = 100;

public boolean isEnabled() {
return enabled;
Expand All @@ -42,6 +49,14 @@ public int getPort() {
public void setPort(int port) {
this.port = port;
}

public int getIdleStateEventTriggerTime() {
return idleStateEventTriggerTime;
}

public void setIdleStateEventTriggerTime(int idleStateEventTriggerTime) {
this.idleStateEventTriggerTime = idleStateEventTriggerTime;
}
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.dromara.hertzbeat.manager.scheduler.netty;

import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hertzbeat.common.entity.message.ClusterMsg;
import org.dromara.hertzbeat.common.support.CommonThreadPool;
Expand All @@ -15,7 +13,6 @@
import org.dromara.hertzbeat.manager.scheduler.SchedulerProperties;
import org.dromara.hertzbeat.remoting.RemotingServer;
import org.dromara.hertzbeat.remoting.event.NettyEventListener;
import org.dromara.hertzbeat.remoting.netty.NettyHook;
import org.dromara.hertzbeat.remoting.netty.NettyRemotingServer;
import org.dromara.hertzbeat.remoting.netty.NettyServerConfig;
import org.springframework.boot.CommandLineRunner;
Expand Down Expand Up @@ -59,17 +56,10 @@ public ManageServer(final SchedulerProperties schedulerProperties,
private void init(final SchedulerProperties schedulerProperties, final CommonThreadPool threadPool) {
NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setPort(schedulerProperties.getServer().getPort());
nettyServerConfig.setIdleStateEventTriggerTime(schedulerProperties.getServer().getIdleStateEventTriggerTime());
NettyEventListener nettyEventListener = new ManageNettyEventListener();
this.remotingServer = new NettyRemotingServer(nettyServerConfig, nettyEventListener, threadPool);

// register hook
this.remotingServer.registerHook(Lists.newArrayList(new NettyHook() {
@Override
public void doBeforeRequest(ChannelHandlerContext ctx, ClusterMsg.Message message) {
ManageServer.this.clientChannelTable.put(message.getIdentity(), ctx.channel());
}
}));


// register processor
this.remotingServer.registerProcessor(ClusterMsg.MessageType.HEARTBEAT, new HeartbeatProcessor(this));
this.remotingServer.registerProcessor(ClusterMsg.MessageType.GO_ONLINE, new CollectorOnlineProcessor(this));
Expand All @@ -84,13 +74,17 @@ public void start() {
this.remotingServer.start();

this.channelSchedule.scheduleAtFixedRate(() -> {
ManageServer.this.clientChannelTable.forEach((collector, channel) -> {
if (!channel.isActive()) {
channel.closeFuture();
ManageServer.this.clientChannelTable.remove(collector);
ManageServer.this.collectorAndJobScheduler.collectorGoOffline(collector);
}
});
try {
this.clientChannelTable.forEach((collector, channel) -> {
if (!channel.isActive()) {
channel.closeFuture();
this.clientChannelTable.remove(collector);
this.collectorAndJobScheduler.collectorGoOffline(collector);
}
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}, 10, 3, TimeUnit.SECONDS);
}

Expand All @@ -113,6 +107,14 @@ public Channel getChannel(final String identity) {
return channel;
}

public void addChannel(final String identity, Channel channel) {
Channel preChannel = this.clientChannelTable.get(identity);
if (preChannel != null && channel.isActive()) {
preChannel.close();
}
this.clientChannelTable.put(identity, channel);
}

public void closeChannel(final String identity) {
Channel channel = this.getChannel(identity);
if (channel != null) {
Expand Down Expand Up @@ -165,6 +167,7 @@ public void onChannelIdle(Channel channel) {
if (identity != null) {
ManageServer.this.clientChannelTable.remove(identity);
ManageServer.this.collectorAndJobScheduler.collectorGoOffline(identity);
channel.close();
log.info("handle idle event triggered. the client {} is going offline.", identity);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m
String collector = message.getIdentity();
log.info("the collector {} actively requests to go online.", collector);
CollectorInfo collectorInfo = JsonUtil.fromJson(message.getMsg(), CollectorInfo.class);
this.manageServer.addChannel(collector, ctx.channel());
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(collector, collectorInfo);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ public ClusterMsg.Message handle(ChannelHandlerContext ctx, ClusterMsg.Message m
String identity = message.getIdentity();
boolean isChannelExist = this.manageServer.isChannelExist(identity);
if (!isChannelExist) {
log.info("the collector {} has reconnected and to go online.", identity);
this.manageServer.getCollectorAndJobScheduler().collectorGoOnline(identity, null);
log.info("the collector {} is not online.", identity);
}
if (log.isDebugEnabled()) {
log.debug("server receive collector heartbeat");
log.debug("server receive collector {} heartbeat", message.getIdentity());
}
return ClusterMsg.Message.newBuilder()
.setType(ClusterMsg.MessageType.HEARTBEAT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private void initChannel(final SocketChannel channel) {
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
// idle state
pipeline.addLast(new IdleStateHandler(0, 0, 30));
pipeline.addLast(new IdleStateHandler(0, 0, nettyServerConfig.getIdleStateEventTriggerTime()));
pipeline.addLast(new NettyServerHandler());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ public class NettyServerConfig {

private Integer port;

private Integer idleStateEventTriggerTime = 100;

}

0 comments on commit b68c64e

Please sign in to comment.