diff --git a/common/src/main/java/org/astraea/common/admin/TopicChecker.java b/common/src/main/java/org/astraea/common/admin/TopicChecker.java index 388abee7e8..532a478096 100644 --- a/common/src/main/java/org/astraea/common/admin/TopicChecker.java +++ b/common/src/main/java/org/astraea/common/admin/TopicChecker.java @@ -98,10 +98,10 @@ static TopicChecker skewPartition(double factor) { var min = clusterInfo.replicaLeaders(topic).stream() .mapToLong(Replica::size) - .max(); + .min(); return max.isPresent() && min.isPresent() - && ((double) min.getAsLong() / max.getAsLong() >= factor); + && ((double) min.getAsLong() / max.getAsLong() < factor); }) .collect(Collectors.toSet())); } diff --git a/common/src/test/java/org/astraea/common/admin/TopicCheckerTest.java b/common/src/test/java/org/astraea/common/admin/TopicCheckerTest.java index 842d99782c..c06811e205 100644 --- a/common/src/test/java/org/astraea/common/admin/TopicCheckerTest.java +++ b/common/src/test/java/org/astraea/common/admin/TopicCheckerTest.java @@ -115,8 +115,16 @@ void testNoData() { @Test void testSkewPartition() { + var singlePartitionTopic = Utils.randomString(); var topic = Utils.randomString(); try (var admin = Admin.of(service.bootstrapServers())) { + admin + .creator() + .topic(singlePartitionTopic) + .numberOfPartitions(1) + .run() + .toCompletableFuture() + .join(); admin.creator().topic(topic).numberOfPartitions(2).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); @@ -128,15 +136,19 @@ void testSkewPartition() { IntStream.range(0, 100) .forEach( ignored -> - producer - .send( + producer.send( + List.of( + Record.builder() + .topic(singlePartitionTopic) + .value("1".getBytes()) + .partition(0) + .build(), Record.builder() .topic(topic) .value("1".getBytes()) .partition(0) - .build()) - .toCompletableFuture() - .join()); + .build()))); + producer.flush(); } Assertions.assertEquals( diff --git a/gui/src/main/java/org/astraea/gui/Main.java b/gui/src/main/java/org/astraea/gui/Main.java index 3a26404cd4..05cb7fd1b4 100644 --- a/gui/src/main/java/org/astraea/gui/Main.java +++ b/gui/src/main/java/org/astraea/gui/Main.java @@ -23,12 +23,12 @@ import org.astraea.common.MapUtils; import org.astraea.gui.pane.Slide; import org.astraea.gui.tab.AboutNode; -import org.astraea.gui.tab.BalancerNode; import org.astraea.gui.tab.BrokerNode; import org.astraea.gui.tab.ClientNode; import org.astraea.gui.tab.ConnectorNode; import org.astraea.gui.tab.QuotaNode; import org.astraea.gui.tab.SettingNode; +import org.astraea.gui.tab.health.HealthNode; import org.astraea.gui.tab.topic.TopicNode; /** @@ -69,8 +69,8 @@ public void start(Stage stage) { ConnectorNode.of(context), "quota", QuotaNode.of(context), - "balancer", - BalancerNode.of(context), + "health", + HealthNode.of(context), "about", AboutNode.of(context))) .node(), diff --git a/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java b/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java similarity index 99% rename from gui/src/main/java/org/astraea/gui/tab/BalancerNode.java rename to gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java index 8afb4a7314..ab63d78842 100644 --- a/gui/src/main/java/org/astraea/gui/tab/BalancerNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/health/BalancerNode.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.gui.tab; +package org.astraea.gui.tab.health; import java.time.Duration; import java.util.Arrays; @@ -56,7 +56,7 @@ import org.astraea.gui.text.EditableText; import org.astraea.gui.text.TextInput; -public class BalancerNode { +class BalancerNode { static final AtomicReference LAST_PLAN = new AtomicReference<>(); static final String TOPIC_NAME_KEY = "topic"; diff --git a/gui/src/main/java/org/astraea/gui/tab/health/HealthNode.java b/gui/src/main/java/org/astraea/gui/tab/health/HealthNode.java new file mode 100644 index 0000000000..9d62f68e58 --- /dev/null +++ b/gui/src/main/java/org/astraea/gui/tab/health/HealthNode.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.gui.tab.health; + +import java.time.Duration; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javafx.geometry.Side; +import javafx.scene.Node; +import org.astraea.common.FutureUtils; +import org.astraea.common.MapUtils; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.Topic; +import org.astraea.common.admin.TopicChecker; +import org.astraea.common.admin.TopicConfigs; +import org.astraea.gui.Context; +import org.astraea.gui.pane.PaneBuilder; +import org.astraea.gui.pane.Slide; + +public class HealthNode { + + public static Node healthNode(Context context) { + return PaneBuilder.of() + .firstPart( + null, + List.of(), + "CHECK", + (argument, logger) -> + FutureUtils.combine( + badTopics(context.admin()), + unavailablePartitions(context.admin()), + (topics, partitions) -> { + var result = new LinkedHashMap>>(); + result.put("topic", topics); + result.put("partition", partitions); + return result; + })) + .build(); + } + + static CompletionStage>> badTopics(Admin admin) { + return FutureUtils.combine( + admin.topicNames(List.of(TopicChecker.NO_DATA)), + admin.topicNames(List.of(TopicChecker.NO_CONSUMER_GROUP)), + admin.topicNames( + List.of(TopicChecker.noWriteAfter(Duration.ofHours(1), Duration.ofSeconds(1)))), + admin.topicNames(List.of(TopicChecker.skewPartition(0.5))), + (noDataTopics, noConsumerTopics, noWriteTopics, skewTopics) -> + Stream.of( + noDataTopics.stream(), + noConsumerTopics.stream(), + noWriteTopics.stream(), + skewTopics.stream()) + .flatMap(s -> s) + .distinct() + .map( + name -> { + var r = new LinkedHashMap(); + r.put("topic", name); + r.put("empty", noDataTopics.contains(name)); + r.put("no consumer group", noConsumerTopics.contains(name)); + r.put("no write (1 hour)", noWriteTopics.contains(name)); + r.put("unbalanced", skewTopics.contains(name)); + return r; + }) + .collect(Collectors.toList())); + } + + static CompletionStage>> unavailablePartitions(Admin admin) { + return admin + .topicNames(true) + .thenCompose( + names -> + FutureUtils.combine( + admin.topics(names), + admin.partitions(names), + (topics, partitions) -> { + var minInSync = + topics.stream() + .collect( + Collectors.toMap( + Topic::name, + t -> + t.config() + .value(TopicConfigs.MIN_IN_SYNC_REPLICAS_CONFIG) + .map(Integer::parseInt) + .orElse(1))); + + return partitions.stream() + .filter( + p -> + p.isr().size() < minInSync.getOrDefault(p.topic(), 1) + || p.leader().isEmpty()) + .map( + p -> { + var r = new LinkedHashMap(); + r.put("topic", p.topic()); + r.put("partition", p.partition()); + r.put( + "leader", + p.leader().map(n -> String.valueOf(n.id())).orElse("null")); + r.put( + "in-sync replicas", + p.isr().stream() + .map(n -> String.valueOf(n.id())) + .collect(Collectors.joining(","))); + r.put( + TopicConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, + minInSync.getOrDefault(p.topic(), 1)); + r.put("readable", p.leader().isPresent()); + r.put( + "writable", + p.leader().isPresent() + && p.isr().size() >= minInSync.getOrDefault(p.topic(), 1)); + return (Map) r; + }) + .collect(Collectors.toList()); + })); + } + + public static Node of(Context context) { + return Slide.of( + Side.TOP, + MapUtils.of("basic", healthNode(context), "balancer", BalancerNode.of(context))) + .node(); + } +} diff --git a/gui/src/main/java/org/astraea/gui/tab/topic/PartitionNode.java b/gui/src/main/java/org/astraea/gui/tab/topic/PartitionNode.java index 3364baafd4..c19d8d6831 100644 --- a/gui/src/main/java/org/astraea/gui/tab/topic/PartitionNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/topic/PartitionNode.java @@ -39,7 +39,7 @@ import org.astraea.gui.text.EditableText; import org.astraea.gui.text.TextInput; -public class PartitionNode { +class PartitionNode { private static final String TOPIC_NAME_KEY = "topic"; private static final String PARTITION_KEY = "partition"; diff --git a/gui/src/main/java/org/astraea/gui/tab/topic/ReplicaNode.java b/gui/src/main/java/org/astraea/gui/tab/topic/ReplicaNode.java index 308253f629..ccf91394e2 100644 --- a/gui/src/main/java/org/astraea/gui/tab/topic/ReplicaNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/topic/ReplicaNode.java @@ -40,7 +40,7 @@ import org.astraea.gui.text.EditableText; import org.astraea.gui.text.TextInput; -public class ReplicaNode { +class ReplicaNode { static final String TOPIC_NAME_KEY = "topic"; static final String PARTITION_KEY = "partition"; diff --git a/gui/src/main/java/org/astraea/gui/tab/topic/TopicNode.java b/gui/src/main/java/org/astraea/gui/tab/topic/TopicNode.java index 12257ac2a2..ab105c2f9e 100644 --- a/gui/src/main/java/org/astraea/gui/tab/topic/TopicNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/topic/TopicNode.java @@ -41,7 +41,6 @@ import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Partition; import org.astraea.common.admin.ProducerState; -import org.astraea.common.admin.Topic; import org.astraea.common.admin.TopicConfigs; import org.astraea.common.metrics.broker.HasRate; import org.astraea.common.metrics.broker.ServerMetrics; @@ -401,85 +400,6 @@ private static List> basicResult( .collect(Collectors.toList()); } - static List> emptyTopics(List partitions) { - return partitions.stream() - .collect( - Collectors.groupingBy( - Partition::topic, - Collectors.mapping(Partition::latestOffset, Collectors.reducing(0L, Long::sum)))) - .entrySet() - .stream() - .filter(e -> e.getValue() <= 0) - .map( - e -> - (Map) - MapUtils.of("topic", (Object) e.getKey(), "records", e.getValue())) - .collect(Collectors.toList()); - } - - static List> unavailablePartitions( - List topics, List partitions) { - var minInSync = - topics.stream() - .collect( - Collectors.toMap( - Topic::name, - t -> - t.config() - .value(TopicConfigs.MIN_IN_SYNC_REPLICAS_CONFIG) - .map(Integer::parseInt) - .orElse(1))); - - return partitions.stream() - .filter(p -> p.isr().size() < minInSync.getOrDefault(p.topic(), 1) || p.leader().isEmpty()) - .map( - p -> { - var r = new LinkedHashMap(); - r.put("topic", p.topic()); - r.put("partition", p.partition()); - r.put("leader", p.leader().map(n -> String.valueOf(n.id())).orElse("null")); - r.put( - "in-sync replicas", - p.isr().stream() - .map(n -> String.valueOf(n.id())) - .collect(Collectors.joining(","))); - r.put(TopicConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, minInSync.getOrDefault(p.topic(), 1)); - r.put("readable", p.leader().isPresent()); - r.put( - "writable", - p.leader().isPresent() && p.isr().size() >= minInSync.getOrDefault(p.topic(), 1)); - return (Map) r; - }) - .collect(Collectors.toList()); - } - - public static Node healthNode(Context context) { - return PaneBuilder.of() - .firstPart( - null, - List.of(), - "CHECK", - (argument, logger) -> - context - .admin() - .topicNames(true) - .thenCompose( - names -> - FutureUtils.combine( - context.admin().topics(names), - context.admin().partitions(names), - (topics, partitions) -> { - var result = - new LinkedHashMap>>(); - result.put("empty topics", emptyTopics(partitions)); - result.put( - "unavailable partitions", - unavailablePartitions(topics, partitions)); - return result; - }))) - .build(); - } - public static Node of(Context context) { return Slide.of( Side.TOP, @@ -489,7 +409,6 @@ public static Node of(Context context) { "replica", ReplicaNode.of(context), "config", configNode(context), "metrics", metricsNode(context), - "health", healthNode(context), "create", createNode(context))) .node(); } diff --git a/gui/src/main/java/org/astraea/gui/table/TableViewer.java b/gui/src/main/java/org/astraea/gui/table/TableViewer.java index b588fec4f8..87ba4a6f7e 100644 --- a/gui/src/main/java/org/astraea/gui/table/TableViewer.java +++ b/gui/src/main/java/org/astraea/gui/table/TableViewer.java @@ -150,7 +150,7 @@ private void refresh() { allFilteredData = allData.entrySet().stream() .collect( - Collectors.toMap( + MapUtils.toLinkedHashMap( Map.Entry::getKey, e -> e.getValue().stream() @@ -174,9 +174,9 @@ private void refresh() { .collect(Collectors.toUnmodifiableList()))); var allTables = - allData.keySet().stream() + allFilteredData.keySet().stream() .collect( - Collectors.toMap( + MapUtils.toLinkedHashMap( Function.identity(), name -> { var table = new TableView>(); diff --git a/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java b/gui/src/test/java/org/astraea/gui/tab/health/BalancerNodeTest.java similarity index 99% rename from gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java rename to gui/src/test/java/org/astraea/gui/tab/health/BalancerNodeTest.java index 0d90dde6e6..12be9a102f 100644 --- a/gui/src/test/java/org/astraea/gui/tab/BalancerNodeTest.java +++ b/gui/src/test/java/org/astraea/gui/tab/health/BalancerNodeTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.gui.tab; +package org.astraea.gui.tab.health; import java.time.Duration; import java.util.List; diff --git a/gui/src/test/java/org/astraea/gui/tab/topic/TopicNodeTest.java b/gui/src/test/java/org/astraea/gui/tab/health/HealthNodeTest.java similarity index 78% rename from gui/src/test/java/org/astraea/gui/tab/topic/TopicNodeTest.java rename to gui/src/test/java/org/astraea/gui/tab/health/HealthNodeTest.java index 6fbf4f914e..c4cb0a34ec 100644 --- a/gui/src/test/java/org/astraea/gui/tab/topic/TopicNodeTest.java +++ b/gui/src/test/java/org/astraea/gui/tab/health/HealthNodeTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.astraea.gui.tab.topic; +package org.astraea.gui.tab.health; import java.time.Duration; import java.util.List; @@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class TopicNodeTest { +public class HealthNodeTest { private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); @@ -43,15 +43,10 @@ void testEmptyTopics() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { admin.creator().topic(topic).numberOfPartitions(2).run().toCompletableFuture().join(); Utils.sleep(Duration.ofSeconds(2)); - var result = - TopicNode.emptyTopics( - admin - .partitions(admin.topicNames(false).toCompletableFuture().join()) - .toCompletableFuture() - .join()); + var result = HealthNode.badTopics(admin).toCompletableFuture().join(); Assertions.assertNotEquals(0, result.size()); Assertions.assertEquals(topic, result.get(0).get("topic")); - Assertions.assertEquals(0L, result.get(0).get("records")); + Assertions.assertTrue((Boolean) result.get(0).get("empty")); } } @@ -81,16 +76,7 @@ void testUnavailablePartitions() { Utils.sleep(Duration.ofSeconds(2)); - var result = - TopicNode.unavailablePartitions( - admin - .topics(admin.topicNames(false).toCompletableFuture().join()) - .toCompletableFuture() - .join(), - admin - .partitions(admin.topicNames(false).toCompletableFuture().join()) - .toCompletableFuture() - .join()); + var result = HealthNode.unavailablePartitions(admin).toCompletableFuture().join(); Assertions.assertEquals(1, result.size()); Assertions.assertEquals(topic, result.get(0).get("topic")); Assertions.assertFalse((boolean) result.get(0).get("writable"));