From 787b26ee43c1c7425ca2dd9b7e4fb0fa1d6cecc4 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Fri, 3 Jan 2025 01:30:41 +0800 Subject: [PATCH] [CHECKER] add base code --- app/src/main/java/org/astraea/app/App.java | 9 +- .../org/astraea/app/checker/Changelog.java | 35 +++++++ .../java/org/astraea/app/checker/Checker.java | 93 +++++++++++++++++++ .../java/org/astraea/app/checker/Config.java | 42 +++++++++ .../java/org/astraea/app/checker/Guard.java | 28 ++++++ .../astraea/app/checker/ProduceRpcGuard.java | 44 +++++++++ .../org/astraea/app/checker/Protocol.java | 40 ++++++++ .../java/org/astraea/app/checker/Report.java | 52 +++++++++++ .../common/metrics/broker/NetworkMetrics.java | 22 +++++ 9 files changed, 359 insertions(+), 6 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/checker/Changelog.java create mode 100644 app/src/main/java/org/astraea/app/checker/Checker.java create mode 100644 app/src/main/java/org/astraea/app/checker/Config.java create mode 100644 app/src/main/java/org/astraea/app/checker/Guard.java create mode 100644 app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java create mode 100644 app/src/main/java/org/astraea/app/checker/Protocol.java create mode 100644 app/src/main/java/org/astraea/app/checker/Report.java diff --git a/app/src/main/java/org/astraea/app/App.java b/app/src/main/java/org/astraea/app/App.java index 76cea37259..35bf314b6d 100644 --- a/app/src/main/java/org/astraea/app/App.java +++ b/app/src/main/java/org/astraea/app/App.java @@ -23,8 +23,7 @@ import java.util.Map; import org.astraea.app.automation.Automation; import org.astraea.app.benchmark.BalancerBenchmarkApp; -import org.astraea.app.homework.BulkChecker; -import org.astraea.app.homework.BulkSender; +import org.astraea.app.checker.Checker; import org.astraea.app.homework.Prepare; import org.astraea.app.homework.SendYourData; import org.astraea.app.performance.Performance; @@ -35,10 +34,8 @@ public class App { private static final Map> MAIN_CLASSES = Map.of( - "bulk_sender", - BulkSender.class, - "bulk_checker", - BulkChecker.class, + "40_checker", + Checker.class, "performance", Performance.class, "prepare", diff --git a/app/src/main/java/org/astraea/app/checker/Changelog.java b/app/src/main/java/org/astraea/app/checker/Changelog.java new file mode 100644 index 0000000000..e27ec0b46a --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Changelog.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class Changelog { + private List protocols; + private List configs; + + public Map protocols() { + return protocols.stream().collect(Collectors.toMap(Protocol::name, Function.identity())); + } + + public Map configs() { + return configs.stream().collect(Collectors.toMap(Config::name, Function.identity())); + } +} diff --git a/app/src/main/java/org/astraea/app/checker/Checker.java b/app/src/main/java/org/astraea/app/checker/Checker.java new file mode 100644 index 0000000000..e255dc81ac --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Checker.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import com.beust.jcommander.Parameter; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.Node; +import org.astraea.app.argument.IntegerMapField; +import org.astraea.app.argument.NonEmptyStringField; +import org.astraea.app.argument.NonNegativeIntegerField; +import org.astraea.common.json.JsonConverter; +import org.astraea.common.json.TypeRef; +import org.astraea.common.metrics.JndiClient; +import org.astraea.common.metrics.MBeanClient; + +public class Checker { + + private static final List GUARDS = List.of(new ProduceRpcGuard()); + + public static void main(String[] args) throws Exception { + execute(Argument.parse(new Argument(), args)); + } + + public static void execute(final Argument param) throws Exception { + try (var admin = Admin.create(Map.of("bootstrap.servers", param.bootstrapServers()))) { + for (var guard : GUARDS) { + var result = guard.run(admin, param.mBeanClientFunction(), param.readChangelog()); + System.out.println(result); + } + } + } + + public static class Argument extends org.astraea.app.argument.Argument { + @Parameter( + names = {"--changelog"}, + description = "String: url of changelog file", + validateWith = NonEmptyStringField.class) + String changelog = + "https://raw.githubusercontent.com/opensource4you/astraea/refs/heads/main/config/kafka_changelog.json"; + + Changelog readChangelog() throws IOException { + try (var in = new URL(changelog).openStream()) { + return JsonConverter.defaultConverter() + .fromJson( + new String(in.readAllBytes(), StandardCharsets.UTF_8), TypeRef.of(Changelog.class)); + } + } + + @Parameter( + names = {"--jmx.port"}, + description = "Integer: the port to query JMX for each server", + validateWith = NonNegativeIntegerField.class, + converter = NonNegativeIntegerField.class) + int jmxPort = -1; + + @Parameter( + names = {"--jmx.ports"}, + description = + "Map: the JMX port for each broker. For example: 1024=19999 means for the broker with id 1024, its JMX port located at 19999 port", + validateWith = IntegerMapField.class, + converter = IntegerMapField.class) + Map jmxPorts = Map.of(); + + Function mBeanClientFunction() { + return node -> { + int port = jmxPorts.getOrDefault(node.id(), jmxPort); + if (port < 0) + throw new IllegalArgumentException("Failed to get jmx port for broker: " + node); + return JndiClient.of(node.host(), port); + }; + } + } +} diff --git a/app/src/main/java/org/astraea/app/checker/Config.java b/app/src/main/java/org/astraea/app/checker/Config.java new file mode 100644 index 0000000000..9670535444 --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Config.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import java.util.Optional; + +public class Config { + private String name; + private Optional value; + private String commit; + private String kip; + + public String name() { + return name; + } + + public Optional value() { + return value; + } + + public String commit() { + return commit; + } + + public String kip() { + return kip; + } +} diff --git a/app/src/main/java/org/astraea/app/checker/Guard.java b/app/src/main/java/org/astraea/app/checker/Guard.java new file mode 100644 index 0000000000..5307919a52 --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Guard.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import java.util.Collection; +import java.util.function.Function; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.Node; +import org.astraea.common.metrics.MBeanClient; + +public interface Guard { + Collection run(Admin admin, Function clients, Changelog changelog) + throws Exception; +} diff --git a/app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java b/app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java new file mode 100644 index 0000000000..ab4774b68b --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/ProduceRpcGuard.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import java.util.Collection; +import java.util.function.Function; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.Node; +import org.astraea.common.metrics.MBeanClient; +import org.astraea.common.metrics.broker.NetworkMetrics; + +public class ProduceRpcGuard implements Guard { + @Override + public Collection run( + Admin admin, Function clients, Changelog changelog) throws Exception { + return admin.describeCluster().nodes().get().stream() + .map( + node -> { + var protocol = + changelog + .protocols() + .get(NetworkMetrics.Request.PRODUCE.metricName().toLowerCase()); + if (protocol == null) return Report.empty(); + var versions = NetworkMetrics.Request.PRODUCE.versions(clients.apply(node)); + return Report.of(node, protocol, versions); + }) + .flatMap(Report::stream) + .toList(); + } +} diff --git a/app/src/main/java/org/astraea/app/checker/Protocol.java b/app/src/main/java/org/astraea/app/checker/Protocol.java new file mode 100644 index 0000000000..8783385380 --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Protocol.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +public class Protocol { + private String name; + private int base; + private String commit; + private String kip; + + public int base() { + return base; + } + + public String name() { + return name; + } + + public String commit() { + return commit; + } + + public String kip() { + return kip; + } +} diff --git a/app/src/main/java/org/astraea/app/checker/Report.java b/app/src/main/java/org/astraea/app/checker/Report.java new file mode 100644 index 0000000000..49b22216a1 --- /dev/null +++ b/app/src/main/java/org/astraea/app/checker/Report.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.checker; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.kafka.common.Node; + +public record Report(Node node, String why) { + static Report noMetrics(Node node) { + return new Report(node, "failed to get metrics from"); + } + + static Report of(Node node, String why) { + return new Report(node, why); + } + + static Report empty() { + return new Report(null, ""); + } + + static Report of(Node node, Protocol protocol, Set versions) { + var unsupportedVersions = + versions.stream().filter(v -> v < protocol.base()).collect(Collectors.toSet()); + if (unsupportedVersions.isEmpty()) return empty(); + return new Report( + node, + String.format( + "there are unsupported %s versions: %s due to new baseline: %s", + protocol.name(), unsupportedVersions, protocol.base())); + } + + Stream stream() { + if (why.isEmpty()) return Stream.empty(); + return Stream.of(this); + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java index 45c6f48afe..083f75e244 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/NetworkMetrics.java @@ -20,6 +20,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; @@ -132,6 +134,26 @@ public Histogram fetch(MBeanClient mBeanClient) { return new Histogram(mBeanClient.bean(ALL.get(this))); } + public Set versions(MBeanClient mBeanClient) { + try { + var beanObjects = + mBeanClient.beans( + BeanQuery.builder() + .domainName("kafka.network") + .property("type", "RequestMetrics") + .property("request", "Produce") + .property("name", "RequestsPerSec") + .property("version", "*") + .build()); + return beanObjects.stream() + .map(b -> Integer.parseInt(b.properties().get("version"))) + .collect(Collectors.toSet()); + } catch (NoSuchElementException ignored) { + // this is expected if the node has no such request + return Set.of(); + } + } + public record Histogram(BeanObject beanObject) implements HasHistogram { public Request type() {