diff --git a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
index 8be5eab9bc0..c94016f0e8d 100644
--- a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
+++ b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
@@ -199,4 +199,9 @@ public interface DispatchConstants {
String PARSE_PROM_QL = "PromQL";
String PARSE_PROM_QL_VECTOR = "vector";
String PARSE_PROM_QL_MATRIX = "matrix";
+
+ /**
+ * protocol kafka
+ */
+ String PROTOCOL_KAFKA = "kclient";
}
diff --git a/collector/collector-kafka/pom.xml b/collector/collector-kafka/pom.xml
new file mode 100644
index 00000000000..9acd5c98d94
--- /dev/null
+++ b/collector/collector-kafka/pom.xml
@@ -0,0 +1,50 @@
+
+
+
+ 4.0.0
+
+ org.apache.hertzbeat
+ hertzbeat-collector
+ 2.0-SNAPSHOT
+
+
+ hertzbeat-collector-kafka
+ ${project.artifactId}
+
+
+ 17
+ 17
+ UTF-8
+
+
+
+
+
+ org.apache.hertzbeat
+ hertzbeat-collector-common
+ provided
+
+
+
+ org.apache.kafka
+ kafka-clients
+
+
+
\ No newline at end of file
diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
new file mode 100644
index 00000000000..ac48029bd52
--- /dev/null
+++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.collect.AbstractCollect;
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.springframework.util.Assert;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Slf4j
+public class KafkaCollectImpl extends AbstractCollect {
+
+ @Override
+ public void preCheck(Metrics metrics) throws IllegalArgumentException {
+ KafkaProtocol kafkaProtocol = metrics.getKclient();
+ // Ensure that metrics and kafkaProtocol are not null
+ Assert.isTrue(metrics != null && kafkaProtocol != null, "Kafka collect must have kafkaProtocol params");
+ // Ensure that host and port are not empty
+ Assert.hasText(kafkaProtocol.getHost(), "Kafka Protocol host is required.");
+ Assert.hasText(kafkaProtocol.getPort(), "Kafka Protocol port is required.");
+ }
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) {
+ try {
+ KafkaProtocol kafkaProtocol = metrics.getKclient();
+ String command = kafkaProtocol.getCommand();
+ boolean isKafkaCommand = SupportedCommand.isKafkaCommand(command);
+ if (!isKafkaCommand) {
+ log.error("Unsupported command: {}", command);
+ return;
+ }
+
+ // Create AdminClient with the provided host and port
+ AdminClient adminClient = KafkaConnect.getAdminClient(kafkaProtocol.getHost() + ":" + kafkaProtocol.getPort());
+
+ // Execute the appropriate collection method based on the command
+ switch (SupportedCommand.fromCommand(command)) {
+ case TOPIC_DESCRIBE:
+ collectTopicDescribe(builder, adminClient);
+ break;
+ case TOPIC_LIST:
+ collectTopicList(builder, adminClient);
+ break;
+ case TOPIC_OFFSET:
+ collectTopicOffset(builder, adminClient);
+ break;
+ default:
+ log.error("Unsupported command: {}", command);
+ break;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Kafka collect error", e);
+ }
+ }
+
+ /**
+ * Collect the earliest and latest offsets for each topic
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ * @throws InterruptedException If the thread is interrupted
+ * @throws ExecutionException If an error occurs during execution
+ */
+ private void collectTopicOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
+ ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().listInternal(true));
+ Set names = listTopicsResult.names().get();
+ names.forEach(name -> {
+ try {
+ Map map = adminClient.describeTopics(Collections.singleton(name)).all().get(3L, TimeUnit.SECONDS);
+ map.forEach((key, value) -> value.partitions().forEach(info -> extractedOffset(builder, adminClient, name, value, info)));
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ log.warn("Topic {} get offset fail", name);
+ }
+ });
+ }
+
+ private void extractedOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient, String name, TopicDescription value, TopicPartitionInfo info) {
+ try {
+ TopicPartition topicPartition = new TopicPartition(value.name(), info.partition());
+ long earliestOffset = getEarliestOffset(adminClient, topicPartition);
+ long latestOffset = getLatestOffset(adminClient, topicPartition);
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ valueRowBuilder.addColumns(value.name());
+ valueRowBuilder.addColumns(String.valueOf(info.partition()));
+ valueRowBuilder.addColumns(String.valueOf(earliestOffset));
+ valueRowBuilder.addColumns(String.valueOf(latestOffset));
+ builder.addValues(valueRowBuilder.build());
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ log.warn("Topic {} get offset fail", name);
+ }
+ }
+
+ /**
+ * Get the earliest offset for a given topic partition
+ *
+ * @param adminClient The AdminClient
+ * @param topicPartition The TopicPartition
+ * @return The earliest offset
+ */
+ private long getEarliestOffset(AdminClient adminClient, TopicPartition topicPartition)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return adminClient
+ .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.earliest()))
+ .all()
+ .get(3L, TimeUnit.SECONDS)
+ .get(topicPartition)
+ .offset();
+ }
+
+ /**
+ * Get the latest offset for a given topic partition
+ *
+ * @param adminClient The AdminClient
+ * @param topicPartition The TopicPartition
+ * @return The latest offset
+ */
+ private long getLatestOffset(AdminClient adminClient, TopicPartition topicPartition)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return adminClient
+ .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest()))
+ .all()
+ .get(3L, TimeUnit.SECONDS)
+ .get(topicPartition)
+ .offset();
+ }
+
+ /**
+ * Collect the list of topics
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ */
+ private static void collectTopicList(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
+ ListTopicsOptions options = new ListTopicsOptions().listInternal(true);
+ Set names = adminClient.listTopics(options).names().get();
+ names.forEach(name -> {
+ CollectRep.ValueRow valueRow = CollectRep.ValueRow.newBuilder().addColumns(name).build();
+ builder.addValues(valueRow);
+ });
+ }
+
+ /**
+ * Collect the description of each topic
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ */
+ private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException {
+ ListTopicsOptions options = new ListTopicsOptions();
+ options.listInternal(true);
+ ListTopicsResult listTopicsResult = adminClient.listTopics(options);
+ Set names = listTopicsResult.names().get();
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names);
+ Map map = describeTopicsResult.all().get();
+ map.forEach((key, value) -> {
+ List listp = value.partitions();
+ listp.forEach(info -> {
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ valueRowBuilder.addColumns(value.name());
+ valueRowBuilder.addColumns(String.valueOf(value.partitions().size()));
+ valueRowBuilder.addColumns(String.valueOf(info.partition()));
+ valueRowBuilder.addColumns(info.leader().host());
+ valueRowBuilder.addColumns(String.valueOf(info.leader().port()));
+ valueRowBuilder.addColumns(String.valueOf(info.replicas().size()));
+ valueRowBuilder.addColumns(String.valueOf(info.replicas()));
+ builder.addValues(valueRowBuilder.build());
+ });
+ });
+ }
+
+ @Override
+ public String supportProtocol() {
+ return DispatchConstants.PROTOCOL_KAFKA;
+ }
+}
\ No newline at end of file
diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
new file mode 100644
index 00000000000..2d0bbb11bb6
--- /dev/null
+++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+
+import java.util.Properties;
+
+/**
+ * Kafka connection
+ */
+public class KafkaConnect extends AbstractConnection {
+
+
+ private static AdminClient adminClient;
+
+ public KafkaConnect(String brokerList) {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ properties.put(AdminClientConfig.RETRIES_CONFIG, 3);
+ properties.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 500);
+ adminClient = KafkaAdminClient.create(properties);
+ }
+
+ @Override
+ public AdminClient getConnection() {
+ return adminClient;
+ }
+
+ @Override
+ public void closeConnection() throws Exception {
+ if (this.adminClient != null) {
+ this.adminClient.close();
+ }
+ }
+
+ public static synchronized AdminClient getAdminClient(String brokerList) {
+ if (adminClient == null) {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ adminClient = KafkaAdminClient.create(properties);
+ }
+ return adminClient;
+ }
+
+}
diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
new file mode 100644
index 00000000000..911a3529dc1
--- /dev/null
+++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * SupportedCommand
+ */
+public enum SupportedCommand {
+
+ TOPIC_DESCRIBE("topic-describe"),
+ TOPIC_LIST("topic-list"),
+ TOPIC_OFFSET("topic-offset");
+
+ private static Set SUPPORTED_COMMAND = new HashSet<>();
+
+ static {
+ // O(1) complexity, using static to load all system placeholders
+ for (SupportedCommand placeholder : SupportedCommand.values()) {
+ SUPPORTED_COMMAND.add(placeholder.getCommand());
+ }
+ }
+
+ private final String key;
+
+ SupportedCommand(String command) {
+ this.key = command;
+ }
+
+ public String getCommand() {
+ return key;
+ }
+
+ public static boolean isKafkaCommand(String str) {
+ return SUPPORTED_COMMAND.contains(str);
+ }
+
+ public static SupportedCommand fromCommand(String command) {
+ for (SupportedCommand supportedCommand : SupportedCommand.values()) {
+ if (supportedCommand.getCommand().equals(command)) {
+ return supportedCommand;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant for command: " + command);
+ }
+}
diff --git a/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
new file mode 100644
index 00000000000..04c7676b158
--- /dev/null
+++ b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test case for {@link KafkaCollectImpl}
+ */
+public class KafkaCollectTest {
+ private KafkaCollectImpl collect;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ collect = new KafkaCollectImpl();
+ }
+
+ @Test
+ void preCheck() {
+ // metrics is null
+ assertThrows(NullPointerException.class, () -> {
+ collect.preCheck(null);
+ });
+
+ // kafka is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ collect.preCheck(Metrics.builder().build());
+ });
+
+ // kafka srv host is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ KafkaProtocol kafka = new KafkaProtocol();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+
+ // kafka port is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").build();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+
+ // no exception throw
+ assertDoesNotThrow(() -> {
+ KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+ }
+
+ @Test
+ void collect() {
+ // metrics is null
+ assertThrows(NullPointerException.class, () -> {
+ CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder();
+ collect.collect(builder, 1L, "app", null);
+ });
+
+ assertDoesNotThrow(() -> {
+ CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder();
+ KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+ Metrics metrics = Metrics.builder().kclient(kafka).build();
+ collect.collect(builder, 1L, "app", metrics);
+ });
+ }
+
+ @Test
+ void supportProtocol() {
+ assertEquals(DispatchConstants.PROTOCOL_KAFKA, collect.supportProtocol());
+ }
+}
diff --git a/collector/collector/pom.xml b/collector/collector/pom.xml
index e4e2801b1f8..172d3af8369 100644
--- a/collector/collector/pom.xml
+++ b/collector/collector/pom.xml
@@ -42,6 +42,13 @@
${hertzbeat.version}
+
+
+ org.apache.hertzbeat
+ hertzbeat-collector-kafka
+ ${hertzbeat.version}
+
+
org.apache.hertzbeat
@@ -63,6 +70,8 @@
${hertzbeat.version}
+
+
org.springframework.boot
diff --git a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
index 69b4076fe4f..1ece33cf384 100644
--- a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
+++ b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
@@ -26,3 +26,4 @@ org.apache.hertzbeat.collector.collect.nebulagraph.NgqlCollectImpl
org.apache.hertzbeat.collector.collect.imap.ImapCollectImpl
org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl
org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl
+org.apache.hertzbeat.collector.collect.kafka.KafkaCollectImpl
diff --git a/collector/pom.xml b/collector/pom.xml
index d1dccb9d625..039b37b67ce 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -40,6 +40,7 @@
collector-mongodb
collector-nebulagraph
collector-rocketmq
+ collector-kafka
@@ -59,6 +60,11 @@
hertzbeat-collector-mongodb
${hertzbeat.version}
+
+ org.apache.hertzbeat
+ hertzbeat-collector-kafka
+ ${hertzbeat.version}
+
org.apache.hertzbeat
hertzbeat-collector-nebulagraph
diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
index 37b96db3307..086c366a4be 100644
--- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
+++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
@@ -23,6 +23,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -36,6 +37,7 @@
import org.apache.hertzbeat.common.entity.job.protocol.ImapProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.JmxProtocol;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MemcachedProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MongodbProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
@@ -231,6 +233,11 @@ public class Metrics {
*/
private MqttProtocol mqtt;
+ /**
+ * Monitoring configuration information using the public kafka protocol
+ */
+ private KafkaProtocol kclient;
+
/**
* collector use - Temporarily store subTask metrics response data
*/
diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
new file mode 100644
index 00000000000..9603e1ffdd7
--- /dev/null
+++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.job.protocol;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Kafka protocol
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class KafkaProtocol {
+
+ /**
+ * IP ADDRESS OR DOMAIN NAME OF THE PEER HOST
+ */
+ private String host;
+
+ /**
+ * Port number
+ */
+ private String port;
+
+ /**
+ * TIME OUT PERIOD
+ */
+ private String timeout;
+
+ /**
+ * COMMAND
+ */
+ private String command;
+}
diff --git a/home/docs/help/kafka_client.md b/home/docs/help/kafka_client.md
new file mode 100644
index 00000000000..baedd1c4a64
--- /dev/null
+++ b/home/docs/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client
+title: Monitoring: Kafka Monitoring (Client-based)
+sidebar_label: Kafka Monitoring (Client-based)
+keywords: [open-source monitoring system, open-source message middleware monitoring, Kafka monitoring]
+---
+
+> Collect and monitor general metrics for Kafka.
+
+### Configuration Parameters
+
+| Parameter Name | Help Description |
+|------------------|---------------------------------------------------------------|
+| Monitoring Host | The monitored peer's IPv4, IPv6, or domain name. Note: ⚠️ Do not include protocol headers (e.g., https://, http://). |
+| Monitoring Port | The monitored service port. |
+| Task Name | The identifier for this monitoring task, which must be unique. |
+| Collection Interval | The interval for periodic data collection, in seconds. The minimum allowable interval is 30 seconds. |
+| Description/Remarks | Additional information to describe and identify this monitoring task. Users can add remarks here. |
+
+### Collected Metrics
+
+#### Metric Set: topic_list
+
+| Metric Name | Unit | Help Description |
+|--------------|------|------------------|
+| TopicName | None | Topic Name |
+
+#### Metric Set: topic_detail
+
+| Metric Name | Unit | Help Description |
+|----------------------|------|------------------|
+| TopicName | None | Topic Name |
+| PartitionNum | None | Number of Partitions |
+| PartitionLeader | None | Partition Leader |
+| BrokerHost | None | Broker Host |
+| BrokerPort | None | Broker Port |
+| ReplicationFactorSize| None | Replication Factor Size |
+| ReplicationFactor | None | Replication Factor |
+
+#### Metric Set: topic_offset
+
+| Metric Name | Unit | Help Description |
+|---------------|------|------------------|
+| TopicName | None | Topic Name |
+| PartitionNum | None | Number of Partitions |
+| earliest | None | Earliest Offset |
+| latest | None | Latest Offset |
diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
new file mode 100644
index 00000000000..1ae63e03bf3
--- /dev/null
+++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client
+title: 监控:Kafka监控(基于客户端)
+sidebar_label: Kafka监控(基于客户端)
+keywords: [开源监控系统, 开源消息中间件监控, Kafka监控]
+---
+
+> 对Kafka的通用指标进行采集监控
+
+### 配置参数
+
+| 参数名称 | 参数帮助描述 |
+|--------|------------------------------------------------------|
+| 监控Host | 被监控的对端IPV4,IPV6或域名。注意⚠️不带协议头(eg: https://, http://)。 |
+| 监控Port | 被监控的服务端口。 |
+| 任务名称 | 标识此监控的名称,名称需要保证唯一性。 |
+| 采集间隔 | 监控周期性采集数据间隔时间,单位秒,可设置的最小间隔为30秒 |
+| 描述备注 | 更多标识和描述此监控的备注信息,用户可以在这里备注信息 |
+
+### 采集指标
+
+#### 指标集合:topic_list
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-------------|------|---------|
+| TopicName | 无 | 主题名称 |
+
+#### 指标集合:topic_detail
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-----------|------|--------|
+| TopicName | 无 | 主题名称 |
+| PartitionNum | 无 | 分区数量 |
+| PartitionLeader | 无 | 分区领导者 |
+| BrokerHost | 无 | Broker主机 |
+| BrokerPort | 无 | Broker端口 |
+| ReplicationFactorSize | 无 | 复制因子大小 |
+| ReplicationFactor | 无 | 复制因子 |
+
+#### 指标集合:topic_offset
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-------|---|---------|
+| TopicName | 无 | 主题名称 |
+| PartitionNum | 无 | 分区数量 |
+| earliest | 无 | 最早偏移量 |
+| latest | 无 | 最新偏移量 |
diff --git a/manager/src/main/resources/define/app-kafka_client.yml b/manager/src/main/resources/define/app-kafka_client.yml
new file mode 100644
index 00000000000..5d063aec9dd
--- /dev/null
+++ b/manager/src/main/resources/define/app-kafka_client.yml
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The monitoring type category:service-application service monitoring db-database monitoring custom-custom monitoring os-operating system monitoring
+category: mid
+# The monitoring type eg: linux windows tomcat mysql aws...
+app: kafka_client
+# The monitoring i18n name
+name:
+ zh-CN: Kafka消息系统(客户端)
+ en-US: Kafka Message(Client)
+# The description and help of this monitoring type
+help:
+ zh-CN: HertzBeat 使用 Kafka Admin Client 对 Kafka 的通用指标进行采集监控。
+ en-US: HertzBeat uses Kafka Admin Client to monitoring kafka general metrics.
+ zh-TW: HertzBeat 使用 Kafka Admin Client 對 Kafka 的通用指標進行采集監控。
+helpLink:
+ zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/kafka_client
+ en-US: https://hertzbeat.apache.org/docs/help/kafka_client
+# Input params define for monitoring(render web ui by the definition)
+params:
+ # field-param field key
+ - field: host
+ # name-param field display i18n name
+ name:
+ zh-CN: 目标Host
+ en-US: Target Host
+ # type-param field type(most mapping the html input type)
+ type: host
+ # required-true or false
+ required: true
+ - field: port
+ name:
+ zh-CN: 端口
+ en-US: Port
+ type: number
+ # when type is number, range is required
+ range: '[0,65535]'
+ required: true
+ defaultValue: 9092
+
+# collect metrics config list
+metrics:
+ # metrics - server_info
+ - name: topic_list
+ i18n:
+ zh-CN: 主题列表
+ en-US: Topic List
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-list
+ - name: topic_detail
+ i18n:
+ zh-CN: 主题详细信息
+ en-US: Topic Detail Info
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ - field: PartitionNum
+ type: 1
+ i18n:
+ zh-CN: 分区数量
+ en-US: Partition Num
+ - field: PartitionLeader
+ type: 1
+ i18n:
+ zh-CN: 分区领导者
+ en-US: Partition Leader
+ - field: BrokerHost
+ type: 1
+ i18n:
+ zh-CN: Broker主机
+ en-US: Broker Host
+ - field: BrokerPort
+ type: 1
+ i18n:
+ zh-CN: Broker端口
+ en-US: Broker Port
+ - field: ReplicationFactorSize
+ type: 1
+ i18n:
+ zh-CN: 复制因子大小
+ en-US: Replication Factor Size
+ - field: ReplicationFactor
+ type: 1
+ i18n:
+ zh-CN: 复制因子
+ en-US: Replication Factor
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-describe
+ - name: topic_offset
+ i18n:
+ zh-CN: 主题偏移量
+ en-US: Topic Offset
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ - field: PartitionNum
+ type: 1
+ i18n:
+ zh-CN: 分区数量
+ en-US: Partition Num
+ - field: earliest
+ type: 0
+ i18n:
+ zh-CN: 最早偏移量
+ en-US: Earliest Offset
+ - field: latest
+ type: 0
+ i18n:
+ zh-CN: 最新偏移量
+ en-US: Latest Offset
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-offset
+