Skip to content

Commit

Permalink
[common] feature:add kafka common queue (#966)
Browse files Browse the repository at this point in the history
  • Loading branch information
baiban114 authored May 19, 2023
1 parent 0ff8a1c commit 23b4dde
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 0 deletions.
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,10 @@
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
package org.dromara.hertzbeat.common.queue.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.dromara.hertzbeat.common.entity.message.CollectRep;
import org.dromara.hertzbeat.common.queue.CommonDataQueue;
import org.dromara.hertzbeat.common.serialize.AlertDeserializer;
import org.dromara.hertzbeat.common.serialize.AlertSerializer;
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataDeserializer;
import org.dromara.hertzbeat.common.serialize.KafkaMetricsDataSerializer;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;


/**
* kafka采集数据队列实现
*
*
*/
@Configuration
@ConditionalOnProperty(prefix = "common.queue", name = "type", havingValue = "kafka",
matchIfMissing = false)
@Slf4j
public class KafkaCommonDataQueue implements CommonDataQueue, DisposableBean {

private KafkaProducer<Long, CollectRep.MetricsData> metricsDataProducer;
private KafkaProducer<Long, Alert> kafkaAlertProducer;
private KafkaConsumer<Long, Alert> alertConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToAlertConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToPersistentStorageConsumer;
private KafkaConsumer<Long, CollectRep.MetricsData> metricsDataToMemoryStorageConsumer;
@Autowired
private KafkaProperties kafka;

@PostConstruct
public void initDataQueue(){
try {
Map<String, Object> producerConfig = new HashMap<String, Object>(3);
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 3);
metricsDataProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new KafkaMetricsDataSerializer());
kafkaAlertProducer = new KafkaProducer<>(producerConfig, new LongSerializer(), new AlertSerializer());

Map<String, Object> consumerConfig = new HashMap<String, Object>(4);
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka.getServers());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
consumerConfig.put("group.id", "default-consumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

alertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new AlertDeserializer());

metricsDataToAlertConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
metricsDataToMemoryStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());
metricsDataToPersistentStorageConsumer = new KafkaConsumer<>(consumerConfig, new LongDeserializer(), new KafkaMetricsDataDeserializer());

alertConsumer.subscribe(Collections.singletonList(kafka.getAlertTopic()));
metricsDataToAlertConsumer.subscribe(Collections.singletonList(kafka.getAlertMetricTopic()));
metricsDataToPersistentStorageConsumer.subscribe(Collections.singletonList(kafka.getPersistentStorageTopic()));
metricsDataToMemoryStorageConsumer.subscribe(Collections.singletonList(kafka.getRealTimeMemoryStorageTopic()));
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

@Override
public void addAlertData(Alert alert) {
if (kafkaAlertProducer != null) {
kafkaAlertProducer.send(new ProducerRecord<>(kafka.getAlertTopic(), alert));
} else {
log.error("kafkaAlertProducer is not enable");
}
}

@Override
public Alert pollAlertData() throws InterruptedException {
Alert alert = null;
try {
ConsumerRecords<Long, Alert> records = alertConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<Long, Alert> record : records) {
alert = record.value();
}
alertConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return alert;
}

@Override
public CollectRep.MetricsData pollAlertMetricsData() throws InterruptedException {
CollectRep.MetricsData metricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToAlertConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
metricsData = record.value();
}
metricsDataToAlertConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return metricsData;
}

@Override
public CollectRep.MetricsData pollPersistentStorageMetricsData() throws InterruptedException {
CollectRep.MetricsData persistentStorageMetricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToPersistentStorageConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
persistentStorageMetricsData = record.value();
}
metricsDataToPersistentStorageConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return persistentStorageMetricsData;
}

@Override
public CollectRep.MetricsData pollRealTimeStorageMetricsData() throws InterruptedException {
CollectRep.MetricsData memoryMetricsData = null;
try {
ConsumerRecords<Long, CollectRep.MetricsData> records = metricsDataToMemoryStorageConsumer.poll(Duration.ofSeconds(1));
for ( ConsumerRecord<Long, CollectRep.MetricsData> record : records) {
memoryMetricsData = record.value();
}
metricsDataToMemoryStorageConsumer.commitAsync();
}catch (ConcurrentModificationException e){
//kafka多线程下线程不安全异常
}
return memoryMetricsData;
}

@Override
public void sendMetricsData(CollectRep.MetricsData metricsData) {
if (metricsDataProducer != null) {
metricsDataProducer.send(new ProducerRecord<>(kafka.getAlertMetricTopic(), metricsData));
metricsDataProducer.send(new ProducerRecord<>(kafka.getPersistentStorageTopic(), metricsData));
metricsDataProducer.send(new ProducerRecord<>(kafka.getRealTimeMemoryStorageTopic(), metricsData));
} else {
log.error("metricsDataProducer is not enabled");
}

}

@Override
public void destroy() throws Exception {
if (metricsDataProducer != null) {
metricsDataProducer.close();
}
if (kafkaAlertProducer != null) {
kafkaAlertProducer.close();
}
if (alertConsumer != null) {
alertConsumer.close();
}
if (metricsDataToAlertConsumer != null) {
metricsDataToAlertConsumer.close();
}
if (metricsDataToPersistentStorageConsumer != null) {
metricsDataToPersistentStorageConsumer.close();
}
if (metricsDataToMemoryStorageConsumer != null) {
metricsDataToMemoryStorageConsumer.close();
}
}
@Component
@ConfigurationProperties("common.queue.kafka")
public static class KafkaProperties {
/**
* kafka的连接服务器url
*/
private String servers;
/**
* 接收数据的topic名称
*/
private String alertTopic;
private String alertMetricTopic;
private String persistentStorageTopic;
private String realTimeMemoryStorageTopic;
/**
* 消费者组ID
*/
private String groupId;
public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public String getAlertTopic() {
return alertTopic;
}

public void setAlertTopic(String alertTopic) {
this.alertTopic = alertTopic;
}

public String getAlertMetricTopic() {
return alertMetricTopic;
}

public void setAlertMetricTopic(String alertMetricTopic) {
this.alertMetricTopic = alertMetricTopic;
}

public String getPersistentStorageTopic() {
return persistentStorageTopic;
}

public void setPersistentStorageTopic(String persistentStorageTopic) {
this.persistentStorageTopic = persistentStorageTopic;
}

public String getRealTimeMemoryStorageTopic() {
return realTimeMemoryStorageTopic;
}

public void setRealTimeMemoryStorageTopic(String realTimeMemoryStorageTopic) {
this.realTimeMemoryStorageTopic = realTimeMemoryStorageTopic;
}

public String getGroupId() {
return groupId;
}

public void setGroupId(String groupId) {
this.groupId = groupId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.dromara.hertzbeat.common.serialize;

import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.dromara.hertzbeat.common.util.JsonUtil;

import java.util.Map;

/**
* kafka告警记录反序列化类
*
*/
public class AlertDeserializer implements Deserializer {
@Override
public void configure(Map configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}

@Override
public Object deserialize(String s, byte[] bytes) {
return JsonUtil.fromJson(new String(bytes), Alert.class);
}

@Override
public Object deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}

@Override
public void close() {
Deserializer.super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dromara.hertzbeat.common.serialize;


import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.dromara.hertzbeat.common.entity.alerter.Alert;
import org.dromara.hertzbeat.common.util.JsonUtil;

import java.util.Map;

/**
* kafka告警记录序列化类
*
*/
public class AlertSerializer implements Serializer<Alert> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Serializer.super.configure(configs, isKey);
}

@Override
public byte[] serialize(String s, Alert alert) {
if (alert == null){
return null;
}
return JsonUtil.toJson(alert).getBytes();
}

@Override
public byte[] serialize(String topic, Headers headers, Alert data) {
return Serializer.super.serialize(topic, headers, data);
}

@Override
public void close() {
Serializer.super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dromara.hertzbeat.common.serialize;

import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.dromara.hertzbeat.common.entity.message.CollectRep;

import java.util.Map;

/**
* kafka指标组监控数据反序列化类
*
*/
public class KafkaMetricsDataDeserializer implements Deserializer<CollectRep.MetricsData> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}

@Override
public CollectRep.MetricsData deserialize(String s, byte[] bytes){
try {
return CollectRep.MetricsData.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}

@Override
public CollectRep.MetricsData deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}

@Override
public void close() {
Deserializer.super.close();
}
}
Loading

0 comments on commit 23b4dde

Please sign in to comment.