Skip to content

Commit

Permalink
netty as an independent module, add new feature about collector list (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ceilzcx authored and tomsun28 committed Jan 16, 2024
1 parent 3aa27e6 commit 4d001a2
Show file tree
Hide file tree
Showing 83 changed files with 3,142 additions and 929 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ jobs:
with:
java-version: 11
- name: Build with Maven
run: mvn -B package --file pom.xml
run: mvn clean -B package --file pom.xml
4 changes: 2 additions & 2 deletions alerter/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
### alerter
### Alerter

Processing data according to threshold rules to determine alarms, and alarm distribution functions.
根据告警规则配置信息,处理指标数据判断告警,告警分发。
根据告警规则配置信息,处理指标数据判断告警,告警分发。
5 changes: 5 additions & 0 deletions collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<groupId>org.dromara.hertzbeat</groupId>
<artifactId>hertzbeat-common</artifactId>
</dependency>
<!-- remoting -->
<dependency>
<groupId>org.dromara.hertzbeat</groupId>
<artifactId>hertzbeat-remoting</artifactId>
</dependency>
<!-- validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -77,14 +78,14 @@ public RocketmqSingleCollectImpl() {
Runtime runtime = Runtime.getRuntime();
int corePoolSize = Math.max(8, runtime.availableProcessors());
int maximumPoolSize = Math.max(16, runtime.availableProcessors());
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicLong threadIndex = new AtomicLong(0);

@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "RocketMQCollectGroup_" + this.threadIndex.incrementAndGet());
}
};
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((thread, throwable) -> {
log.error("RocketMQCollectGroup has uncaughtException.");
log.error(throwable.getMessage(), throwable);
})
.setDaemon(true)
.setNameFormat("rocketMQ-collector-%d")
.build();
this.executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(5000), threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
}
Expand Down Expand Up @@ -131,6 +132,7 @@ public String supportProtocol() {

/**
* 采集前置条件, 入参判断
*
* @param metrics 数据指标
*/
private void preCheck(Metrics metrics) {
Expand All @@ -144,6 +146,7 @@ private void preCheck(Metrics metrics) {

/**
* 创建DefaultMQAdminExt实体类; 这里有个小问题, 是否需要每次都重新创建
*
* @param metrics 数据指标
* @return DefaultMQAdminExt
*/
Expand All @@ -162,7 +165,8 @@ private DefaultMQAdminExt createMqAdminExt(Metrics metrics) {

/**
* 采集rocketmq数据
* @param mqAdminExt rocketmq提供的远程调用类
*
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
Expand All @@ -174,7 +178,8 @@ private void collectData(DefaultMQAdminExt mqAdminExt, RocketmqCollectData rocke

/**
* 采集rocketmq的集群数据
* @param mqAdminExt rocketmq提供的远程调用类
*
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
Expand Down Expand Up @@ -244,7 +249,8 @@ private void collectClusterData(DefaultMQAdminExt mqAdminExt, RocketmqCollectDat

/**
* 采集rocketmq的消费者数据
* @param mqAdminExt rocketmq提供的远程调用类
*
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
Expand Down Expand Up @@ -273,8 +279,7 @@ private void collectConsumerData(DefaultMQAdminExt mqAdminExt, RocketmqCollectDa
ConsumeStats consumeStats = null;
try {
consumeStats = mqAdminExt.examineConsumeStats(consumerGroup);
}
catch (Exception e) {
} catch (Exception e) {
log.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
}
if (consumeStats != null) {
Expand All @@ -285,8 +290,7 @@ private void collectConsumerData(DefaultMQAdminExt mqAdminExt, RocketmqCollectDa
ConsumerConnection consumerConnection = null;
try {
consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
}
catch (Exception e) {
} catch (Exception e) {
log.warn("examineConsumeStats exception to consumerGroup {}, response [{}]", consumerGroup, e.getMessage());
}
if (consumerConnection != null) {
Expand All @@ -312,8 +316,7 @@ private void collectConsumerData(DefaultMQAdminExt mqAdminExt, RocketmqCollectDa
}

/**
*
* @param mqAdminExt rocketmq提供的远程调用类
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
Expand Down Expand Up @@ -343,10 +346,11 @@ private void collectTopicData(DefaultMQAdminExt mqAdminExt, RocketmqCollectData

/**
* 采集数据填充到builder
*
* @param rocketmqCollectData rocketmq数据采集类
* @param builder metrics data builder
* @param aliasFields 字段别名
* @param parseScript JSON的base path
* @param builder metrics data builder
* @param aliasFields 字段别名
* @param parseScript JSON的base path
*/
private void fillBuilder(RocketmqCollectData rocketmqCollectData, CollectRep.MetricsData.Builder builder, List<String> aliasFields, String parseScript) {
String dataJson = JSONObject.toJSONString(rocketmqCollectData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
/**
* Collection data scheduler interface
* 采集数据调度器接口
*
* @author tomsun28
*
*/
public interface CollectDataDispatch {

Expand Down
Loading

0 comments on commit 4d001a2

Please sign in to comment.