From cc19f1af23313185fb266a775c065225fc8f92d5 Mon Sep 17 00:00:00 2001 From: zhangshenghang Date: Fri, 20 Sep 2024 21:05:34 +0800 Subject: [PATCH 1/4] init --- .../collector/dispatch/DispatchConstants.java | 5 + collector/collector-kafka/pom.xml | 34 +++ .../collect/kafka/KafkaCollectImpl.java | 206 ++++++++++++++++++ .../collector/collect/kafka/KafkaConnect.java | 64 ++++++ .../collect/kafka/SupportedCommand.java | 60 +++++ collector/collector/pom.xml | 9 + ...ertzbeat.collector.collect.AbstractCollect | 1 + collector/pom.xml | 6 + .../hertzbeat/common/entity/job/Metrics.java | 7 + .../entity/job/protocol/KafkaProtocol.java | 53 +++++ home/docs/help/kafka_client.md | 47 ++++ .../current/help/kafka_client.md | 48 ++++ .../resources/define/app-kafka_client.yml | 168 ++++++++++++++ 13 files changed, 708 insertions(+) create mode 100644 collector/collector-kafka/pom.xml create mode 100644 collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java create mode 100644 collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java create mode 100644 collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java create mode 100644 common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java create mode 100644 home/docs/help/kafka_client.md create mode 100644 home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md create mode 100644 manager/src/main/resources/define/app-kafka_client.yml diff --git a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java index 8be5eab9bc0..c94016f0e8d 100644 --- a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java +++ b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java @@ -199,4 +199,9 @@ public interface DispatchConstants { String PARSE_PROM_QL = "PromQL"; String PARSE_PROM_QL_VECTOR = "vector"; String PARSE_PROM_QL_MATRIX = "matrix"; + + /** + * protocol kafka + */ + String PROTOCOL_KAFKA = "kclient"; } diff --git a/collector/collector-kafka/pom.xml b/collector/collector-kafka/pom.xml new file mode 100644 index 00000000000..71b946bd914 --- /dev/null +++ b/collector/collector-kafka/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + org.apache.hertzbeat + hertzbeat-collector + 2.0-SNAPSHOT + + + hertzbeat-collector-kafka + ${project.artifactId} + + + 17 + 17 + UTF-8 + + + + + + org.apache.hertzbeat + hertzbeat-collector-common + provided + + + + org.apache.kafka + kafka-clients + + + \ No newline at end of file diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java new file mode 100644 index 00000000000..ec6bd3e43dd --- /dev/null +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.collector.collect.AbstractCollect; +import org.apache.hertzbeat.collector.dispatch.DispatchConstants; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.springframework.util.Assert; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class KafkaCollectImpl extends AbstractCollect { + + @Override + public void preCheck(Metrics metrics) throws IllegalArgumentException { + KafkaProtocol kafkaProtocol = metrics.getKclient(); + // Ensure that metrics and kafkaProtocol are not null + Assert.isTrue(metrics != null && kafkaProtocol != null, "Kafka collect must have kafkaProtocol params"); + // Ensure that host and port are not empty + Assert.hasText(kafkaProtocol.getHost(), "Kafka Protocol host is required."); + Assert.hasText(kafkaProtocol.getPort(), "Kafka Protocol port is required."); + } + + @Override + public void collect(CollectRep.MetricsData.Builder builder, long monitorId, String app, Metrics metrics) { + try { + KafkaProtocol kafkaProtocol = metrics.getKclient(); + String command = kafkaProtocol.getCommand(); + boolean isKafkaCommand = SupportedCommand.isKafkaCommand(command); + if (!isKafkaCommand) { + log.error("Unsupported command: {}", command); + return; + } + + // Create AdminClient with the provided host and port + AdminClient adminClient = KafkaConnect.getAdminClient(kafkaProtocol.getHost() + ":" + kafkaProtocol.getPort()); + + // Execute the appropriate collection method based on the command + switch (SupportedCommand.fromCommand(command)) { + case TOPIC_DESCRIBE: + collectTopicDescribe(builder, adminClient); + break; + case TOPIC_LIST: + collectTopicList(builder, adminClient); + break; + case TOPIC_OFFSET: + collectTopicOffset(builder, adminClient); + break; + default: + log.error("Unsupported command: {}", command); + break; + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + /** + * Collect the earliest and latest offsets for each topic + * @param builder The MetricsData builder + * @param adminClient The AdminClient + * @throws InterruptedException If the thread is interrupted + * @throws ExecutionException If an error occurs during execution + */ + private void collectTopicOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { + ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().listInternal(true)); + Set names = listTopicsResult.names().get(); + names.forEach(name -> { + try { + Map map = adminClient.describeTopics(Collections.singleton(name)).all().get(3L, TimeUnit.SECONDS); + map.forEach((key, value) -> value.partitions().forEach(info -> extractedOffset(builder, adminClient, name, value, info))); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + log.warn("Topic {} get offset fail", name); + } + }); + } + + private void extractedOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient, String name, TopicDescription value, TopicPartitionInfo info) { + try { + TopicPartition topicPartition = new TopicPartition(value.name(), info.partition()); + long earliestOffset = getEarliestOffset(adminClient, topicPartition); + long latestOffset = getLatestOffset(adminClient, topicPartition); + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + valueRowBuilder.addColumns(value.name()); + valueRowBuilder.addColumns(String.valueOf(info.partition())); + valueRowBuilder.addColumns(String.valueOf(earliestOffset)); + valueRowBuilder.addColumns(String.valueOf(latestOffset)); + builder.addValues(valueRowBuilder.build()); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + log.warn("Topic {} get offset fail", name); + } + } + + /** + * Get the earliest offset for a given topic partition + * @param adminClient The AdminClient + * @param topicPartition The TopicPartition + * @return The earliest offset + */ + private long getEarliestOffset(AdminClient adminClient, TopicPartition topicPartition) + throws InterruptedException, ExecutionException, TimeoutException { + return adminClient + .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.earliest())) + .all() + .get(3L, TimeUnit.SECONDS) + .get(topicPartition) + .offset(); + } + + /** + * Get the latest offset for a given topic partition + * @param adminClient The AdminClient + * @param topicPartition The TopicPartition + * @return The latest offset + */ + private long getLatestOffset(AdminClient adminClient, TopicPartition topicPartition) + throws InterruptedException, ExecutionException, TimeoutException { + return adminClient + .listOffsets(Collections.singletonMap(topicPartition, OffsetSpec.latest())) + .all() + .get(3L, TimeUnit.SECONDS) + .get(topicPartition) + .offset(); + } + + /** + * Collect the list of topics + * @param builder The MetricsData builder + * @param adminClient The AdminClient + */ + private static void collectTopicList(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { + ListTopicsOptions options = new ListTopicsOptions().listInternal(true); + Set names = adminClient.listTopics(options).names().get(); + names.forEach(name -> { + CollectRep.ValueRow valueRow = CollectRep.ValueRow.newBuilder().addColumns(name).build(); + builder.addValues(valueRow); + }); + } + + /** + * Collect the description of each topic + * @param builder The MetricsData builder + * @param adminClient The AdminClient + */ + private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { + ListTopicsOptions options = new ListTopicsOptions(); + options.listInternal(true); + ListTopicsResult listTopicsResult = adminClient.listTopics(options); + Set names = listTopicsResult.names().get(); + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(names); + Map map = describeTopicsResult.all().get(); + map.forEach((key, value) -> { + List listp = value.partitions(); + listp.forEach(info -> { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + valueRowBuilder.addColumns(value.name()); + valueRowBuilder.addColumns(String.valueOf(value.partitions().size())); + valueRowBuilder.addColumns(String.valueOf(info.partition())); + valueRowBuilder.addColumns(info.leader().host()); + valueRowBuilder.addColumns(String.valueOf(info.leader().port())); + valueRowBuilder.addColumns(String.valueOf(info.replicas().size())); + valueRowBuilder.addColumns(String.valueOf(info.replicas())); + builder.addValues(valueRowBuilder.build()); + }); + }); + } + + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_KAFKA; + } +} \ No newline at end of file diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java new file mode 100644 index 00000000000..9de4b60ad8f --- /dev/null +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.kafka; + +import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.KafkaAdminClient; + +import java.util.Properties; + +/** + * Kafka connection + */ +public class KafkaConnect extends AbstractConnection { + + + private static AdminClient adminClient; + + public KafkaConnect(String brokerList) { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + properties.put(AdminClientConfig.RETRIES_CONFIG, 3); + properties.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 500); + adminClient = KafkaAdminClient.create(properties); + } + + @Override + public AdminClient getConnection() { + return adminClient; + } + + @Override + public void closeConnection() throws Exception { + if (this.adminClient != null) { + this.adminClient.close(); + } + } + + public static synchronized AdminClient getAdminClient(String brokerList) { + if (adminClient == null) { + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + adminClient = KafkaAdminClient.create(properties); + } + return adminClient; + } + +} diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java new file mode 100644 index 00000000000..a5d83889f59 --- /dev/null +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.kafka; + +import java.util.HashSet; +import java.util.Set; + +public enum SupportedCommand { + + TOPIC_DESCRIBE("topic-describe"), + TOPIC_LIST("topic-list"), + TOPIC_OFFSET("topic-offset"); + + private static Set SUPPORTED_COMMAND = new HashSet<>(); + + static { + // O(1) complexity, using static to load all system placeholders + for (SupportedCommand placeholder : SupportedCommand.values()) { + SUPPORTED_COMMAND.add(placeholder.getCommand()); + } + } + + private final String key; + + SupportedCommand(String command) { + this.key = command; + } + + public String getCommand() { + return key; + } + + public static boolean isKafkaCommand(String str) { + return SUPPORTED_COMMAND.contains(str); + } + + public static SupportedCommand fromCommand(String command) { + for (SupportedCommand supportedCommand : SupportedCommand.values()) { + if (supportedCommand.getCommand().equals(command)) { + return supportedCommand; + } + } + throw new IllegalArgumentException("No enum constant for command: " + command); + } +} diff --git a/collector/collector/pom.xml b/collector/collector/pom.xml index e4e2801b1f8..172d3af8369 100644 --- a/collector/collector/pom.xml +++ b/collector/collector/pom.xml @@ -42,6 +42,13 @@ ${hertzbeat.version} + + + org.apache.hertzbeat + hertzbeat-collector-kafka + ${hertzbeat.version} + + org.apache.hertzbeat @@ -63,6 +70,8 @@ ${hertzbeat.version} + + org.springframework.boot diff --git a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect index 69b4076fe4f..1ece33cf384 100644 --- a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect +++ b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect @@ -26,3 +26,4 @@ org.apache.hertzbeat.collector.collect.nebulagraph.NgqlCollectImpl org.apache.hertzbeat.collector.collect.imap.ImapCollectImpl org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl +org.apache.hertzbeat.collector.collect.kafka.KafkaCollectImpl diff --git a/collector/pom.xml b/collector/pom.xml index d1dccb9d625..039b37b67ce 100644 --- a/collector/pom.xml +++ b/collector/pom.xml @@ -40,6 +40,7 @@ collector-mongodb collector-nebulagraph collector-rocketmq + collector-kafka @@ -59,6 +60,11 @@ hertzbeat-collector-mongodb ${hertzbeat.version} + + org.apache.hertzbeat + hertzbeat-collector-kafka + ${hertzbeat.version} + org.apache.hertzbeat hertzbeat-collector-nebulagraph diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java index 37b96db3307..086c366a4be 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java @@ -23,6 +23,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,6 +37,7 @@ import org.apache.hertzbeat.common.entity.job.protocol.ImapProtocol; import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol; import org.apache.hertzbeat.common.entity.job.protocol.JmxProtocol; +import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol; import org.apache.hertzbeat.common.entity.job.protocol.MemcachedProtocol; import org.apache.hertzbeat.common.entity.job.protocol.MongodbProtocol; import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol; @@ -231,6 +233,11 @@ public class Metrics { */ private MqttProtocol mqtt; + /** + * Monitoring configuration information using the public kafka protocol + */ + private KafkaProtocol kclient; + /** * collector use - Temporarily store subTask metrics response data */ diff --git a/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java new file mode 100644 index 00000000000..9603e1ffdd7 --- /dev/null +++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.common.entity.job.protocol; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Kafka protocol + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class KafkaProtocol { + + /** + * IP ADDRESS OR DOMAIN NAME OF THE PEER HOST + */ + private String host; + + /** + * Port number + */ + private String port; + + /** + * TIME OUT PERIOD + */ + private String timeout; + + /** + * COMMAND + */ + private String command; +} diff --git a/home/docs/help/kafka_client.md b/home/docs/help/kafka_client.md new file mode 100644 index 00000000000..211cfbfe64b --- /dev/null +++ b/home/docs/help/kafka_client.md @@ -0,0 +1,47 @@ +--- +id: kafka_client +title: Monitoring: Kafka Monitoring (Client-based) +sidebar_label: Kafka Monitoring (Client-based) +keywords: [open-source monitoring system, open-source message middleware monitoring, Kafka monitoring] +--- + +> Collect and monitor general metrics for Kafka. + +### Configuration Parameters + +| Parameter Name | Help Description | +|------------------|---------------------------------------------------------------| +| Monitoring Host | The monitored peer's IPv4, IPv6, or domain name. Note: ⚠️ Do not include protocol headers (e.g., https://, http://). | +| Monitoring Port | The monitored service port. | +| Task Name | The identifier for this monitoring task, which must be unique. | +| Collection Interval | The interval for periodic data collection, in seconds. The minimum allowable interval is 30 seconds. | +| Description/Remarks | Additional information to describe and identify this monitoring task. Users can add remarks here. | + +### Collected Metrics + +#### Metric Set: topic_list + +| Metric Name | Unit | Help Description | +|--------------|------|------------------| +| TopicName | None | Topic Name | + +#### Metric Set: topic_detail + +| Metric Name | Unit | Help Description | +|----------------------|------|------------------| +| TopicName | None | Topic Name | +| PartitionNum | None | Number of Partitions | +| PartitionLeader | None | Partition Leader | +| BrokerHost | None | Broker Host | +| BrokerPort | None | Broker Port | +| ReplicationFactorSize| None | Replication Factor Size | +| ReplicationFactor | None | Replication Factor | + +#### Metric Set: topic_offset + +| Metric Name | Unit | Help Description | +|---------------|------|------------------| +| TopicName | None | Topic Name | +| PartitionNum | None | Number of Partitions | +| earliest | None | Earliest Offset | +| latest | None | Latest Offset | \ No newline at end of file diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md new file mode 100644 index 00000000000..384cb8d5c5d --- /dev/null +++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md @@ -0,0 +1,48 @@ +--- +id: kafka_client +title: 监控:Kafka监控(基于客户端) +sidebar_label: Kafka监控(基于客户端) +keywords: [开源监控系统, 开源消息中间件监控, Kafka监控] +--- + +> 对Kafka的通用指标进行采集监控 + +### 配置参数 + +| 参数名称 | 参数帮助描述 | +|--------|------------------------------------------------------| +| 监控Host | 被监控的对端IPV4,IPV6或域名。注意⚠️不带协议头(eg: https://, http://)。 | +| 监控Port | 被监控的服务端口。 | +| 任务名称 | 标识此监控的名称,名称需要保证唯一性。 | +| 采集间隔 | 监控周期性采集数据间隔时间,单位秒,可设置的最小间隔为30秒 | +| 描述备注 | 更多标识和描述此监控的备注信息,用户可以在这里备注信息 | + +### 采集指标 + +#### 指标集合:topic_list + +| 指标名称 | 指标单位 | 指标帮助描述 | +|-------------|------|---------| +| TopicName | 无 | 主题名称 | + +#### 指标集合:topic_detail + +| 指标名称 | 指标单位 | 指标帮助描述 | +|-----------|------|--------| +| TopicName | 无 | 主题名称 | +| PartitionNum | 无 | 分区数量 | +| PartitionLeader | 无 | 分区领导者 | +| BrokerHost | 无 | Broker主机 | +| BrokerPort | 无 | Broker端口 | +| ReplicationFactorSize | 无 | 复制因子大小 | +| ReplicationFactor | 无 | 复制因子 | + +#### 指标集合:topic_offset + +| 指标名称 | 指标单位 | 指标帮助描述 | +|-------|---|---------| +| TopicName | 无 | 主题名称 | +| PartitionNum | 无 | 分区数量 | +| earliest | 无 | 最早偏移量 | +| latest | 无 | 最新偏移量 | + diff --git a/manager/src/main/resources/define/app-kafka_client.yml b/manager/src/main/resources/define/app-kafka_client.yml new file mode 100644 index 00000000000..5d063aec9dd --- /dev/null +++ b/manager/src/main/resources/define/app-kafka_client.yml @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The monitoring type category:service-application service monitoring db-database monitoring custom-custom monitoring os-operating system monitoring +category: mid +# The monitoring type eg: linux windows tomcat mysql aws... +app: kafka_client +# The monitoring i18n name +name: + zh-CN: Kafka消息系统(客户端) + en-US: Kafka Message(Client) +# The description and help of this monitoring type +help: + zh-CN: HertzBeat 使用 Kafka Admin Client 对 Kafka 的通用指标进行采集监控。 + en-US: HertzBeat uses Kafka Admin Client to monitoring kafka general metrics. + zh-TW: HertzBeat 使用 Kafka Admin Client 對 Kafka 的通用指標進行采集監控。 +helpLink: + zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/kafka_client + en-US: https://hertzbeat.apache.org/docs/help/kafka_client +# Input params define for monitoring(render web ui by the definition) +params: + # field-param field key + - field: host + # name-param field display i18n name + name: + zh-CN: 目标Host + en-US: Target Host + # type-param field type(most mapping the html input type) + type: host + # required-true or false + required: true + - field: port + name: + zh-CN: 端口 + en-US: Port + type: number + # when type is number, range is required + range: '[0,65535]' + required: true + defaultValue: 9092 + +# collect metrics config list +metrics: + # metrics - server_info + - name: topic_list + i18n: + zh-CN: 主题列表 + en-US: Topic List + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: TopicName + type: 1 + i18n: + zh-CN: 主题名称 + en-US: Topic Name + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: kclient + # the config content when protocol is jmx + kclient: + host: ^_^host^_^ + port: ^_^port^_^ + command: topic-list + - name: topic_detail + i18n: + zh-CN: 主题详细信息 + en-US: Topic Detail Info + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: TopicName + type: 1 + i18n: + zh-CN: 主题名称 + en-US: Topic Name + - field: PartitionNum + type: 1 + i18n: + zh-CN: 分区数量 + en-US: Partition Num + - field: PartitionLeader + type: 1 + i18n: + zh-CN: 分区领导者 + en-US: Partition Leader + - field: BrokerHost + type: 1 + i18n: + zh-CN: Broker主机 + en-US: Broker Host + - field: BrokerPort + type: 1 + i18n: + zh-CN: Broker端口 + en-US: Broker Port + - field: ReplicationFactorSize + type: 1 + i18n: + zh-CN: 复制因子大小 + en-US: Replication Factor Size + - field: ReplicationFactor + type: 1 + i18n: + zh-CN: 复制因子 + en-US: Replication Factor + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: kclient + # the config content when protocol is jmx + kclient: + host: ^_^host^_^ + port: ^_^port^_^ + command: topic-describe + - name: topic_offset + i18n: + zh-CN: 主题偏移量 + en-US: Topic Offset + # metrics scheduling priority(0->127)->(high->low), metrics with the same priority will be scheduled in parallel + # priority 0's metrics is availability metrics, it will be scheduled first, only availability metrics collect success will the scheduling continue + priority: 0 + # collect metrics content + fields: + # field-metric name, type-metric type(0-number,1-string), unit-metric unit('%','ms','MB'), label-whether it is a metrics label field + - field: TopicName + type: 1 + i18n: + zh-CN: 主题名称 + en-US: Topic Name + - field: PartitionNum + type: 1 + i18n: + zh-CN: 分区数量 + en-US: Partition Num + - field: earliest + type: 0 + i18n: + zh-CN: 最早偏移量 + en-US: Earliest Offset + - field: latest + type: 0 + i18n: + zh-CN: 最新偏移量 + en-US: Latest Offset + # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, sdk + protocol: kclient + # the config content when protocol is jmx + kclient: + host: ^_^host^_^ + port: ^_^port^_^ + command: topic-offset + From abcc4a374e71c31e9013192132bd8e8c95b6a7ed Mon Sep 17 00:00:00 2001 From: zhangshenghang Date: Fri, 20 Sep 2024 21:45:37 +0800 Subject: [PATCH 2/4] init --- collector/collector-kafka/pom.xml | 16 ++++++++++++++++ home/docs/help/kafka_client.md | 2 +- .../current/help/kafka_client.md | 1 - 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/collector/collector-kafka/pom.xml b/collector/collector-kafka/pom.xml index 71b946bd914..9acd5c98d94 100644 --- a/collector/collector-kafka/pom.xml +++ b/collector/collector-kafka/pom.xml @@ -1,4 +1,20 @@ + diff --git a/home/docs/help/kafka_client.md b/home/docs/help/kafka_client.md index 211cfbfe64b..baedd1c4a64 100644 --- a/home/docs/help/kafka_client.md +++ b/home/docs/help/kafka_client.md @@ -44,4 +44,4 @@ keywords: [open-source monitoring system, open-source message middleware monitor | TopicName | None | Topic Name | | PartitionNum | None | Number of Partitions | | earliest | None | Earliest Offset | -| latest | None | Latest Offset | \ No newline at end of file +| latest | None | Latest Offset | diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md index 384cb8d5c5d..1ae63e03bf3 100644 --- a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md +++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md @@ -45,4 +45,3 @@ keywords: [开源监控系统, 开源消息中间件监控, Kafka监控] | PartitionNum | 无 | 分区数量 | | earliest | 无 | 最早偏移量 | | latest | 无 | 最新偏移量 | - From 5b8441337acc9c9b14cfb195ba006d0ad0644f5d Mon Sep 17 00:00:00 2001 From: zhangshenghang Date: Fri, 20 Sep 2024 21:50:30 +0800 Subject: [PATCH 3/4] improve style --- .../collect/kafka/KafkaCollectImpl.java | 22 +++++++++++-------- .../collector/collect/kafka/KafkaConnect.java | 2 +- .../collect/kafka/SupportedCommand.java | 3 +++ 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java index ec6bd3e43dd..91b57e7a206 100644 --- a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.util.Assert; @@ -91,10 +90,11 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri /** * Collect the earliest and latest offsets for each topic - * @param builder The MetricsData builder + * + * @param builder The MetricsData builder * @param adminClient The AdminClient * @throws InterruptedException If the thread is interrupted - * @throws ExecutionException If an error occurs during execution + * @throws ExecutionException If an error occurs during execution */ private void collectTopicOffset(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { ListTopicsResult listTopicsResult = adminClient.listTopics(new ListTopicsOptions().listInternal(true)); @@ -127,7 +127,8 @@ private void extractedOffset(CollectRep.MetricsData.Builder builder, AdminClient /** * Get the earliest offset for a given topic partition - * @param adminClient The AdminClient + * + * @param adminClient The AdminClient * @param topicPartition The TopicPartition * @return The earliest offset */ @@ -143,7 +144,8 @@ private long getEarliestOffset(AdminClient adminClient, TopicPartition topicPart /** * Get the latest offset for a given topic partition - * @param adminClient The AdminClient + * + * @param adminClient The AdminClient * @param topicPartition The TopicPartition * @return The latest offset */ @@ -159,11 +161,12 @@ private long getLatestOffset(AdminClient adminClient, TopicPartition topicPartit /** * Collect the list of topics - * @param builder The MetricsData builder + * + * @param builder The MetricsData builder * @param adminClient The AdminClient */ private static void collectTopicList(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { - ListTopicsOptions options = new ListTopicsOptions().listInternal(true); + ListTopicsOptions options = new ListTopicsOptions().listInternal(true); Set names = adminClient.listTopics(options).names().get(); names.forEach(name -> { CollectRep.ValueRow valueRow = CollectRep.ValueRow.newBuilder().addColumns(name).build(); @@ -173,11 +176,12 @@ private static void collectTopicList(CollectRep.MetricsData.Builder builder, Adm /** * Collect the description of each topic - * @param builder The MetricsData builder + * + * @param builder The MetricsData builder * @param adminClient The AdminClient */ private static void collectTopicDescribe(CollectRep.MetricsData.Builder builder, AdminClient adminClient) throws InterruptedException, ExecutionException { - ListTopicsOptions options = new ListTopicsOptions(); + ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); Set names = listTopicsResult.names().get(); diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java index 9de4b60ad8f..2d0bbb11bb6 100644 --- a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java @@ -52,7 +52,7 @@ public void closeConnection() throws Exception { } } - public static synchronized AdminClient getAdminClient(String brokerList) { + public static synchronized AdminClient getAdminClient(String brokerList) { if (adminClient == null) { Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java index a5d83889f59..911a3529dc1 100644 --- a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java @@ -20,6 +20,9 @@ import java.util.HashSet; import java.util.Set; +/** + * SupportedCommand + */ public enum SupportedCommand { TOPIC_DESCRIBE("topic-describe"), From a80acfbc3353d15b17e258a370e2a0a631d6999a Mon Sep 17 00:00:00 2001 From: zhangshenghang Date: Fri, 20 Sep 2024 21:40:55 +0800 Subject: [PATCH 4/4] add test case --- .../collect/kafka/KafkaCollectImpl.java | 2 +- .../collect/kafka/KafkaCollectTest.java | 93 +++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java diff --git a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java index 91b57e7a206..ac48029bd52 100644 --- a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java +++ b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java @@ -84,7 +84,7 @@ public void collect(CollectRep.MetricsData.Builder builder, long monitorId, Stri break; } } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + log.error("Kafka collect error", e); } } diff --git a/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java new file mode 100644 index 00000000000..04c7676b158 --- /dev/null +++ b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.collector.collect.kafka; + +import org.apache.hertzbeat.collector.dispatch.DispatchConstants; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test case for {@link KafkaCollectImpl} + */ +public class KafkaCollectTest { + private KafkaCollectImpl collect; + + @BeforeEach + public void setUp() throws Exception { + collect = new KafkaCollectImpl(); + } + + @Test + void preCheck() { + // metrics is null + assertThrows(NullPointerException.class, () -> { + collect.preCheck(null); + }); + + // kafka is null + assertThrows(IllegalArgumentException.class, () -> { + collect.preCheck(Metrics.builder().build()); + }); + + // kafka srv host is null + assertThrows(IllegalArgumentException.class, () -> { + KafkaProtocol kafka = new KafkaProtocol(); + collect.preCheck(Metrics.builder().kclient(kafka).build()); + }); + + // kafka port is null + assertThrows(IllegalArgumentException.class, () -> { + KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").build(); + collect.preCheck(Metrics.builder().kclient(kafka).build()); + }); + + // no exception throw + assertDoesNotThrow(() -> { + KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").port("9092").build(); + collect.preCheck(Metrics.builder().kclient(kafka).build()); + }); + } + + @Test + void collect() { + // metrics is null + assertThrows(NullPointerException.class, () -> { + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + collect.collect(builder, 1L, "app", null); + }); + + assertDoesNotThrow(() -> { + CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder(); + KafkaProtocol kafka = KafkaProtocol.builder().host("127.0.0.1").port("9092").build(); + Metrics metrics = Metrics.builder().kclient(kafka).build(); + collect.collect(builder, 1L, "app", metrics); + }); + } + + @Test + void supportProtocol() { + assertEquals(DispatchConstants.PROTOCOL_KAFKA, collect.supportProtocol()); + } +}